diff --git a/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java b/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java index c61de155164..aa6570d20cd 100644 --- a/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java +++ b/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java @@ -75,12 +75,12 @@ public void removeTransactionDroppedListener(final long listenerIdentifier) { @Override public long addSyncStatusListener(final SyncStatusListener syncStatusListener) { - return syncState.addSyncStatusListener(syncStatusListener); + return syncState.subscribeSyncStatus(syncStatusListener); } @Override public void removeSyncStatusListener(final long listenerIdentifier) { - syncState.removeSyncStatusListener(listenerIdentifier); + syncState.unsubscribeSyncStatus(listenerIdentifier); } private static PropagatedBlockContext blockPropagatedContext( diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java index 3f5d108c27a..b8268cb02d8 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java @@ -31,7 +31,7 @@ public class SyncingSubscriptionService { public SyncingSubscriptionService( final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) { this.subscriptionManager = subscriptionManager; - synchronizer.observeSyncStatus(this::sendSyncingToMatchingSubscriptions); + synchronizer.subscribeSyncStatus(this::sendSyncingToMatchingSubscriptions); } private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) { diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java index 13987413ca1..9a3c5ecb449 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java @@ -54,7 +54,7 @@ public class SyncingSubscriptionServiceTest { public void before() { final ArgumentCaptor captor = ArgumentCaptor.forClass(SyncStatusListener.class); - when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L); + when(synchronizer.subscribeSyncStatus(captor.capture())).thenReturn(1L); new SyncingSubscriptionService(subscriptionManager, synchronizer); syncStatusListener = captor.getValue(); } diff --git a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinator.java b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinator.java index c5b73062525..2d6ca3266af 100644 --- a/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinator.java +++ b/ethereum/blockcreation/src/main/java/org/hyperledger/besu/ethereum/blockcreation/AbstractMiningCoordinator.java @@ -55,7 +55,7 @@ public AbstractMiningCoordinator( this.blockchain = blockchain; this.syncState = syncState; this.blockchain.observeBlockAdded(this); - syncState.addInSyncListener(this::inSyncChanged); + syncState.subscribeInSync(this::inSyncChanged); } @Override diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java index f77320a2122..7e035ecec56 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Synchronizer.java @@ -22,6 +22,9 @@ /** Provides an interface to block synchronization processes. */ public interface Synchronizer { + // Default tolerance used to determine whether or not this node is "in sync" + long DEFAULT_IN_SYNC_TOLERANCE = 5; + void start(); void stop(); @@ -32,7 +35,43 @@ public interface Synchronizer { */ Optional getSyncStatus(); - long observeSyncStatus(final BesuEvents.SyncStatusListener listener); + long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener); + + boolean unsubscribeSyncStatus(long observerId); + + /** + * Add a listener that will be notified when this node's sync status changes. A node is considered + * in-sync if the local chain height is no more than {@code DEFAULT_IN_SYNC_TOLERANCE} behind the + * highest estimated remote chain height. + * + * @param listener The callback to invoke when the sync status changes + * @return A subscription id that can be used to unsubscribe from these events + */ + long subscribeInSync(final InSyncListener listener); + + /** + * Add a listener that will be notified when this node's sync status changes. A node is considered + * in-sync if the local chain height is no more than {@code syncTolerance} behind the highest + * estimated remote chain height. + * + * @param listener The callback to invoke when the sync status changes + * @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of + * zero means that the node is considered in-sync only when the local chain height is greater + * than or equal to the best estimated remote chain height. + * @return A subscription id that can be used to unsubscribe from these events + */ + long subscribeInSync(final InSyncListener listener, final long syncTolerance); + + /** + * Unsubscribe from in sync events. + * + * @param listenerId The id returned when subscribing + * @return {@code true} if a subscription was cancelled + */ + boolean unsubscribeInSync(final long listenerId); - boolean removeObserver(long observerId); + @FunctionalInterface + interface InSyncListener { + void onInSyncStatusChange(boolean newSyncStatus); + } } diff --git a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java index 020aafa509d..a15d0fe7485 100644 --- a/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java +++ b/ethereum/core/src/test-support/java/org/hyperledger/besu/ethereum/core/BlockDataGenerator.java @@ -203,11 +203,14 @@ public List blockSequence( } public Block genesisBlock() { - final BlockOptions options = - new BlockOptions() - .setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER) - .setStateRoot(Hash.EMPTY_TRIE_HASH) - .setParentHash(Hash.ZERO); + return genesisBlock(new BlockOptions()); + } + + public Block genesisBlock(final BlockOptions options) { + options + .setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER) + .setStateRoot(Hash.EMPTY_TRIE_HASH) + .setParentHash(Hash.ZERO); return block(options); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainHeadEstimate.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainHeadEstimate.java new file mode 100644 index 00000000000..462cc31eb26 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainHeadEstimate.java @@ -0,0 +1,46 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.ethereum.eth.manager; + +import org.hyperledger.besu.ethereum.chain.ChainHead; +import org.hyperledger.besu.util.uint.UInt256; + +public interface ChainHeadEstimate { + + UInt256 getEstimatedTotalDifficulty(); + + long getEstimatedHeight(); + + /** + * Returns true if this chain state represents a chain that is "better" than the chain represented + * by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or + * heavier than the supplied {@code chainToCheck}. + * + * @param chainToCheck The chain being compared. + * @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}. + */ + default boolean chainIsBetterThan(final ChainHead chainToCheck) { + return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck); + } + + default boolean hasHigherDifficultyThan(final ChainHead chainToCheck) { + return getEstimatedTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0; + } + + default boolean hasLongerChainThan(final ChainHead chainToCheck) { + return getEstimatedHeight() > chainToCheck.getHeight(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainState.java index d8e076760d0..c8671509bee 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainState.java @@ -14,7 +14,6 @@ */ package org.hyperledger.besu.ethereum.eth.manager; -import org.hyperledger.besu.ethereum.chain.ChainHead; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; import org.hyperledger.besu.util.Subscribers; @@ -22,7 +21,7 @@ import com.google.common.base.MoreObjects; -public class ChainState { +public class ChainState implements ChainHeadEstimate { // The best block by total difficulty that we know about private final BestBlock bestBlock = new BestBlock(); // The highest block that we've seen @@ -40,14 +39,20 @@ public void removeEstimatedHeightListener(final long listenerId) { estimatedHeightListeners.unsubscribe(listenerId); } + public ChainStateSnapshot getSnapshot() { + return new ChainStateSnapshot(getEstimatedTotalDifficulty(), getEstimatedHeight()); + } + public boolean hasEstimatedHeight() { return estimatedHeightKnown; } + @Override public long getEstimatedHeight() { return estimatedHeight; } + @Override public UInt256 getEstimatedTotalDifficulty() { return bestBlock.getTotalDifficulty(); } @@ -106,26 +111,6 @@ public void updateHeightEstimate(final long blockNumber) { } } - /** - * Returns true if this chain state represents a chain that is "better" than the chain represented - * by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or - * heavier than the supplied {@code chainToCheck}. - * - * @param chainToCheck The chain being compared. - * @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}. - */ - public boolean chainIsBetterThan(final ChainHead chainToCheck) { - return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck); - } - - private boolean hasHigherDifficultyThan(final ChainHead chainToCheck) { - return bestBlock.getTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0; - } - - private boolean hasLongerChainThan(final ChainHead chainToCheck) { - return estimatedHeight > chainToCheck.getHeight(); - } - @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainStateSnapshot.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainStateSnapshot.java new file mode 100644 index 00000000000..bdfe8848355 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/ChainStateSnapshot.java @@ -0,0 +1,38 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.ethereum.eth.manager; + +import org.hyperledger.besu.util.uint.UInt256; + +public class ChainStateSnapshot implements ChainHeadEstimate { + private final UInt256 totalDifficulty; + private final long chainHeight; + + public ChainStateSnapshot(final UInt256 totalDifficulty, final long chainHeight) { + this.totalDifficulty = totalDifficulty; + this.chainHeight = chainHeight; + } + + @Override + public UInt256 getEstimatedTotalDifficulty() { + return totalDifficulty; + } + + @Override + public long getEstimatedHeight() { + return chainHeight; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java index 3adf56f28ef..1fd0174587e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/EthPeer.java @@ -326,10 +326,16 @@ public boolean hasSeenBlock(final Hash hash) { return knownBlocks.contains(hash); } + /** @return This peer's current chain state. */ public ChainState chainState() { return chainHeadState; } + /** @return A read-only snapshot of this peer's current {@code chainState} } */ + public ChainHeadEstimate chainStateSnapshot() { + return chainHeadState.getSnapshot(); + } + public void registerHeight(final Hash blockHash, final long height) { chainHeadState.update(blockHash, height); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 51004654d8d..09c65634aff 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -34,7 +34,6 @@ import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.util.ExceptionUtils; -import org.hyperledger.besu.util.Subscribers; import java.nio.file.Path; import java.time.Clock; @@ -51,7 +50,6 @@ public class DefaultSynchronizer implements Synchronizer { private final Optional maybePruner; private final SyncState syncState; private final AtomicBoolean running = new AtomicBoolean(false); - private final Subscribers syncStatusListeners = Subscribers.create(); private final BlockPropagationManager blockPropagationManager; private final Optional> fastSyncDownloader; private final FullSyncDownloader fullSyncDownloader; @@ -126,7 +124,6 @@ private TrailingPeerRequirements calculateTrailingPeerRequirements() { public void start() { if (running.compareAndSet(false, true)) { LOG.info("Starting synchronizer."); - syncState.addSyncStatusListener(this::syncStatusCallback); blockPropagationManager.start(); if (fastSyncDownloader.isPresent()) { fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult); @@ -187,17 +184,28 @@ public Optional getSyncStatus() { } @Override - public long observeSyncStatus(final SyncStatusListener listener) { + public long subscribeSyncStatus(final SyncStatusListener listener) { checkNotNull(listener); - return syncStatusListeners.subscribe(listener); + return syncState.subscribeSyncStatus(listener); } @Override - public boolean removeObserver(final long observerId) { - return syncStatusListeners.unsubscribe(observerId); + public boolean unsubscribeSyncStatus(final long subscriberId) { + return syncState.unsubscribeSyncStatus(subscriberId); } - private void syncStatusCallback(final SyncStatus status) { - syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status)); + @Override + public long subscribeInSync(final InSyncListener listener) { + return syncState.subscribeInSync(listener); + } + + @Override + public long subscribeInSync(final InSyncListener listener, final long syncTolerance) { + return syncState.subscribeInSync(listener, syncTolerance); + } + + @Override + public boolean unsubscribeInSync(final long listenerId) { + return syncState.unsubscribeSyncStatus(listenerId); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/InSyncTracker.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/InSyncTracker.java new file mode 100644 index 00000000000..9e7032d4de6 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/InSyncTracker.java @@ -0,0 +1,107 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.besu.ethereum.eth.sync.state; + +import org.hyperledger.besu.ethereum.chain.ChainHead; +import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener; +import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate; + +import java.util.Optional; +import java.util.function.Consumer; + +/** Tracks the sync status of this node within the specified {@code syncTolerance}. */ +class InSyncTracker { + private InSyncState state = InSyncState.UNKNOWN; + // If the local chain is no more than {@code syncTolerance} behind the estimated highest chain, + // then the tracker considers this local node to be in sync + private final long syncTolerance; + + private final InSyncListener listener; + + private InSyncTracker(final InSyncListener listener, final long syncTolerance) { + this.listener = listener; + this.syncTolerance = syncTolerance; + } + + public static InSyncTracker create(final InSyncListener listener, final long syncTolerance) { + return new InSyncTracker(listener, syncTolerance); + } + + public static boolean isInSync( + final ChainHead localChain, final ChainHeadEstimate remoteChain, final long syncTolerance) { + final boolean inSyncByHeight = + remoteChain.getEstimatedHeight() - localChain.getHeight() <= syncTolerance; + return inSyncByHeight || !remoteChain.chainIsBetterThan(localChain); + } + + synchronized void checkState( + final ChainHead localChain, + final Optional syncTargetChain, + final Optional bestPeerChain) { + final boolean currentSyncStatus = + currentSyncStatus(localChain, syncTargetChain, bestPeerChain).orElse(true); + + final InSyncState newState = InSyncState.fromInSync(currentSyncStatus); + if (state != newState) { + // Sync status has changed, notify listener + state = newState; + state.ifKnown(listener::onInSyncStatusChange); + } + } + + private Optional currentSyncStatus( + final ChainHead localChain, + final Optional syncTargetChain, + final Optional bestPeerChain) { + final Optional inSyncWithSyncTarget = + syncTargetChain.map(remote -> isInSync(localChain, remote)); + final Optional inSyncWithBestPeer = + bestPeerChain.map(remote -> isInSync(localChain, remote)); + // If we're out of sync with either peer, we're out of sync + if (inSyncWithSyncTarget.isPresent() && !inSyncWithSyncTarget.get()) { + return Optional.of(false); + } + if (inSyncWithBestPeer.isPresent() && !inSyncWithBestPeer.get()) { + return Optional.of(false); + } + // Otherwise, if either peer is in sync, we're in sync + return inSyncWithSyncTarget.or(() -> inSyncWithBestPeer); + } + + private boolean isInSync(final ChainHead localChain, final ChainHeadEstimate remoteChain) { + return isInSync(localChain, remoteChain, syncTolerance); + } + + private enum InSyncState { + UNKNOWN(Optional.empty()), + IN_SYNC(Optional.of(true)), + OUT_OF_SYNC(Optional.of(false)); + + private final Optional inSync; + + InSyncState(final Optional inSync) { + this.inSync = inSync; + } + + static InSyncState fromInSync(final boolean inSync) { + return inSync ? IN_SYNC : OUT_OF_SYNC; + } + + public void ifKnown(final Consumer handler) { + inSync.ifPresent(handler::accept); + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java index 560d7055877..96ef389e35b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java @@ -18,23 +18,30 @@ import org.hyperledger.besu.ethereum.chain.ChainHead; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.SyncStatus; +import org.hyperledger.besu.ethereum.core.Synchronizer; +import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener; +import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.util.Subscribers; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; public class SyncState { - private static final long SYNC_TOLERANCE = 5; + private final Blockchain blockchain; private final EthPeers ethPeers; - private final Subscribers inSyncListeners = Subscribers.create(); + private final AtomicLong inSyncSubscriberId = new AtomicLong(); + private final Map inSyncTrackers = new ConcurrentHashMap<>(); private final Subscribers syncStatusListeners = Subscribers.create(); private volatile long chainHeightListenerId; private volatile Optional syncTarget = Optional.empty(); @@ -79,16 +86,47 @@ private void publishReorg() { syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus)); } - public void addInSyncListener(final InSyncListener observer) { - inSyncListeners.subscribe(observer); + /** + * Add a listener that will be notified when this node's sync status changes. A node is considered + * in-sync if the local chain height is no more than {@code SYNC_TOLERANCE} behind the highest + * estimated remote chain height. + * + * @param listener The callback to invoke when the sync status changes + * @return An {@code Unsubscriber} that can be used to stop listening for these events + */ + public long subscribeInSync(final InSyncListener listener) { + return subscribeInSync(listener, Synchronizer.DEFAULT_IN_SYNC_TOLERANCE); + } + + /** + * Add a listener that will be notified when this node's sync status changes. A node is considered + * in-sync if the local chain height is no more than {@code syncTolerance} behind the highest + * estimated remote chain height. + * + * @param listener The callback to invoke when the sync status changes + * @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of + * zero means that the node is considered in-sync only when the local chain height is greater + * than or equal to the best estimated remote chain height. + * @return An {@code Unsubscriber} that can be used to stop listening for these events + */ + public long subscribeInSync(final InSyncListener listener, final long syncTolerance) { + final InSyncTracker inSyncTracker = InSyncTracker.create(listener, syncTolerance); + final long id = inSyncSubscriberId.incrementAndGet(); + inSyncTrackers.put(id, inSyncTracker); + + return id; } - public long addSyncStatusListener(final SyncStatusListener observer) { - return syncStatusListeners.subscribe(observer); + public boolean unsubscribeInSync(final long subscriberId) { + return inSyncTrackers.remove(subscriberId) != null; } - public void removeSyncStatusListener(final long listenerId) { - syncStatusListeners.unsubscribe(listenerId); + public long subscribeSyncStatus(final SyncStatusListener listener) { + return syncStatusListeners.subscribe(listener); + } + + public boolean unsubscribeSyncStatus(final long listenerId) { + return syncStatusListeners.unsubscribe(listenerId); } public SyncStatus syncStatus() { @@ -107,29 +145,44 @@ public void setSyncTarget(final EthPeer peer, final BlockHeader commonAncestor) } public boolean isInSync() { - return isInSync(SYNC_TOLERANCE); + return isInSync(Synchronizer.DEFAULT_IN_SYNC_TOLERANCE); } public boolean isInSync(final long syncTolerance) { + return isInSync( + getLocalChainHead(), getSyncTargetChainHead(), getBestPeerChainHead(), syncTolerance); + } + + private boolean isInSync( + final ChainHead localChain, + final Optional syncTargetChain, + final Optional bestPeerChain, + final long syncTolerance) { // Sync target may be temporarily empty while we switch sync targets during a sync, so // check both the sync target and our best peer to determine if we're in sync or not - return isInSyncWithTarget(syncTolerance) && isInSyncWithBestPeer(syncTolerance); + return isInSync(localChain, syncTargetChain, syncTolerance) + && isInSync(localChain, bestPeerChain, syncTolerance); } - private boolean isInSyncWithTarget(final long syncTolerance) { - return syncTarget - .map(t -> t.estimatedTargetHeight() - blockchain.getChainHeadBlockNumber() <= syncTolerance) + private boolean isInSync( + final ChainHead localChain, + final Optional remoteChain, + final long syncTolerance) { + return remoteChain + .map(remoteState -> InSyncTracker.isInSync(localChain, remoteState, syncTolerance)) .orElse(true); } - private boolean isInSyncWithBestPeer(final long syncTolerance) { - final ChainHead chainHead = blockchain.getChainHead(); - return ethPeers - .bestPeerWithHeightEstimate() - .filter(peer -> peer.chainState().chainIsBetterThan(chainHead)) - .map(EthPeer::chainState) - .map(chainState -> chainState.getEstimatedHeight() - chainHead.getHeight() <= syncTolerance) - .orElse(true); + private ChainHead getLocalChainHead() { + return blockchain.getChainHead(); + } + + private Optional getSyncTargetChainHead() { + return syncTarget.map(SyncTarget::peer).map(EthPeer::chainStateSnapshot); + } + + private Optional getBestPeerChainHead() { + return ethPeers.bestPeerWithHeightEstimate().map(EthPeer::chainStateSnapshot); } public void disconnectSyncTarget(final DisconnectReason reason) { @@ -166,18 +219,20 @@ public long bestChainHeight(final long localChainHeight) { } private synchronized void checkInSync() { - final boolean currentInSync = isInSync(); + ChainHead localChain = getLocalChainHead(); + Optional syncTargetChain = getSyncTargetChainHead(); + Optional bestPeerChain = getBestPeerChainHead(); + final boolean currentInSync = isInSync(localChain, syncTargetChain, bestPeerChain, 0); if (lastInSync.compareAndSet(!currentInSync, currentInSync)) { if (!currentInSync) { // when we fall out of sync change our starting block - startingBlock = blockchain.getChainHeadBlockNumber(); + startingBlock = localChain.getHeight(); } - inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync)); } - } - @FunctionalInterface - public interface InSyncListener { - void onSyncStatusChanged(boolean newSyncStatus); + inSyncTrackers + .values() + .forEach( + (syncTracker) -> syncTracker.checkState(localChain, syncTargetChain, bestPeerChain)); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java index 9af33988f3f..49515ac8215 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java @@ -16,32 +16,39 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; -import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; -import org.hyperledger.besu.ethereum.chain.Blockchain; -import org.hyperledger.besu.ethereum.chain.ChainHead; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; -import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.InMemoryStorageProvider; import org.hyperledger.besu.ethereum.core.SyncStatus; +import org.hyperledger.besu.ethereum.core.Synchronizer; +import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.eth.manager.ChainHeadEstimate; import org.hyperledger.besu.ethereum.eth.manager.ChainState; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; +import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.util.uint.UInt256; -import java.util.Collections; +import java.util.List; import java.util.Optional; import org.junit.Before; @@ -50,52 +57,61 @@ public class SyncStateTest { - private static final long OUR_CHAIN_HEAD_NUMBER = 500; - private static final UInt256 OUR_CHAIN_DIFFICULTY = UInt256.of(500); - private static final long TARGET_CHAIN_DELTA = 100; + private static final UInt256 standardDifficultyPerBlock = UInt256.of(1L); + private static final long OUR_CHAIN_HEAD_NUMBER = 20; + private static final UInt256 OUR_CHAIN_DIFFICULTY = + standardDifficultyPerBlock.times(OUR_CHAIN_HEAD_NUMBER); + private static final long TARGET_CHAIN_DELTA = 20; private static final long TARGET_CHAIN_HEIGHT = OUR_CHAIN_HEAD_NUMBER + TARGET_CHAIN_DELTA; - private static final UInt256 TARGET_DIFFICULTY = OUR_CHAIN_DIFFICULTY.plus(TARGET_CHAIN_DELTA); + private static final UInt256 TARGET_DIFFICULTY = + standardDifficultyPerBlock.times(TARGET_CHAIN_HEIGHT); - private final Blockchain blockchain = mock(Blockchain.class); - private final EthPeers ethPeers = mock(EthPeers.class); - private final SyncState.InSyncListener inSyncListener = mock(SyncState.InSyncListener.class); + private final InSyncListener inSyncListener = mock(InSyncListener.class); + private final InSyncListener inSyncListenerExact = mock(InSyncListener.class); private final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class); - private final EthPeer syncTargetPeer = mock(EthPeer.class); - private final ChainState syncTargetPeerChainState = spy(new ChainState()); - private final EthPeer otherPeer = mock(EthPeer.class); - private final ChainState otherPeerChainState = spy(new ChainState()); + + private final BlockDataGenerator gen = new BlockDataGenerator(1); + private final Block genesisBlock = + gen.genesisBlock(new BlockOptions().setDifficulty(UInt256.ZERO)); + private final MutableBlockchain blockchain = + InMemoryStorageProvider.createInMemoryBlockchain(genesisBlock); + + private EthProtocolManager ethProtocolManager; + private EthPeers ethPeers; + private RespondingEthPeer syncTargetPeer; + private RespondingEthPeer otherPeer; private SyncState syncState; - private BlockAddedObserver blockAddedObserver; @Before public void setUp() { - final ArgumentCaptor captor = - ArgumentCaptor.forClass(BlockAddedObserver.class); + ethProtocolManager = + EthProtocolManagerTestUtil.create( + blockchain, InMemoryStorageProvider.createInMemoryWorldStateArchive()); + ethPeers = spy(ethProtocolManager.ethContext().getEthPeers()); + syncTargetPeer = createPeer(TARGET_DIFFICULTY, TARGET_CHAIN_HEIGHT); + otherPeer = createPeer(UInt256.ZERO, 0); - final ChainHead ourChainHead = - new ChainHead(Hash.ZERO, OUR_CHAIN_DIFFICULTY, OUR_CHAIN_HEAD_NUMBER); + advanceLocalChain(OUR_CHAIN_HEAD_NUMBER); - when(blockchain.observeBlockAdded(captor.capture())).thenReturn(1L); - when(blockchain.getChainHeadBlockNumber()).thenReturn(OUR_CHAIN_HEAD_NUMBER); - when(blockchain.getChainHead()).thenReturn(ourChainHead); - when(syncTargetPeer.chainState()).thenReturn(syncTargetPeerChainState); - when(otherPeer.chainState()).thenReturn(otherPeerChainState); syncState = new SyncState(blockchain, ethPeers); - blockAddedObserver = captor.getValue(); - syncState.addInSyncListener(inSyncListener); - syncState.addSyncStatusListener(syncStatusListener); + syncState.subscribeInSync(inSyncListener); + syncState.subscribeInSync(inSyncListenerExact, 0); + syncState.subscribeSyncStatus(syncStatusListener); } @Test public void isInSync_noPeers() { + otherPeer.disconnect(DisconnectReason.REQUESTED); + syncTargetPeer.disconnect(DisconnectReason.REQUESTED); + syncState.clearSyncTarget(); assertThat(syncState.isInSync()).isTrue(); } @Test public void isInSync_singlePeerWithWorseChainBetterHeight() { - updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, OUR_CHAIN_DIFFICULTY.minus(1L)); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(false).when(otherPeerChainState).chainIsBetterThan(any()); + updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, OUR_CHAIN_DIFFICULTY.minus(1L)); + final EthPeer peer = mockWorseChain(otherPeer.getEthPeer()); + doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.syncTarget()).isEmpty(); // Sanity check assertThat(syncState.isInSync()).isTrue(); @@ -105,9 +121,9 @@ public void isInSync_singlePeerWithWorseChainBetterHeight() { @Test public void isInSync_singlePeerWithWorseChainWorseHeight() { updateChainState( - otherPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L)); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(false).when(otherPeerChainState).chainIsBetterThan(any()); + otherPeer.getEthPeer(), OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L)); + final EthPeer peer = mockWorseChain(otherPeer.getEthPeer()); + doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.syncTarget()).isEmpty(); // Sanity check assertThat(syncState.isInSync()).isTrue(); @@ -116,9 +132,9 @@ public void isInSync_singlePeerWithWorseChainWorseHeight() { @Test public void isInSync_singlePeerWithBetterChainWorseHeight() { - updateChainState(otherPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, TARGET_DIFFICULTY); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(true).when(otherPeerChainState).chainIsBetterThan(any()); + updateChainState(otherPeer.getEthPeer(), OUR_CHAIN_HEAD_NUMBER - 1L, TARGET_DIFFICULTY); + final EthPeer peer = mockBetterChain(otherPeer.getEthPeer()); + doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.syncTarget()).isEmpty(); // Sanity check assertThat(syncState.isInSync()).isTrue(); @@ -127,9 +143,9 @@ public void isInSync_singlePeerWithBetterChainWorseHeight() { @Test public void isInSync_singlePeerWithBetterChainBetterHeight() { - updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(true).when(otherPeerChainState).chainIsBetterThan(any()); + updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); + final EthPeer peer = mockBetterChain(otherPeer.getEthPeer()); + doReturn(Optional.of(peer)).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.syncTarget()).isEmpty(); // Sanity check assertThat(syncState.isInSync()).isFalse(); @@ -140,8 +156,8 @@ public void isInSync_singlePeerWithBetterChainBetterHeight() { @Test public void isInSync_syncTargetWithBetterHeight() { - updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); - syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L)); + otherPeer.disconnect(DisconnectReason.REQUESTED); + setupOutOfSyncState(); assertThat(syncState.syncTarget()).isPresent(); // Sanity check assertThat(syncState.isInSync()).isFalse(); @@ -152,8 +168,10 @@ public void isInSync_syncTargetWithBetterHeight() { @Test public void isInSync_syncTargetWithWorseHeight() { - updateChainState(syncTargetPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, TARGET_DIFFICULTY); - syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L)); + otherPeer.disconnect(DisconnectReason.REQUESTED); + final long heightDifference = 20L; + advanceLocalChain(TARGET_CHAIN_HEIGHT + heightDifference); + setupOutOfSyncState(); assertThat(syncState.syncTarget()).isPresent(); // Sanity check assertThat(syncState.isInSync()).isTrue(); @@ -162,11 +180,9 @@ public void isInSync_syncTargetWithWorseHeight() { @Test public void isInSync_outOfSyncWithTargetAndOutOfSyncWithBestPeer() { - updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); - syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L)); - updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(true).when(otherPeerChainState).chainIsBetterThan(any()); + setupOutOfSyncState(); + updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); + doReturn(Optional.of(otherPeer.getEthPeer())).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.isInSync()).isFalse(); assertThat(syncState.isInSync(0)).isFalse(); @@ -177,112 +193,431 @@ public void isInSync_outOfSyncWithTargetAndOutOfSyncWithBestPeer() { @Test public void isInSync_inSyncWithTargetOutOfSyncWithBestPeer() { + setupOutOfSyncState(); + advanceLocalChain(TARGET_CHAIN_HEIGHT); + final long heightDifference = 20L; updateChainState( - syncTargetPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L)); - syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L)); - updateChainState(otherPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(true).when(otherPeerChainState).chainIsBetterThan(any()); + otherPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + heightDifference, + TARGET_DIFFICULTY.plus(heightDifference)); + doReturn(Optional.of(otherPeer.getEthPeer())).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.isInSync()).isFalse(); assertThat(syncState.isInSync(0)).isFalse(); - assertThat(syncState.isInSync(TARGET_CHAIN_DELTA - 1)).isFalse(); - assertThat(syncState.isInSync(TARGET_CHAIN_DELTA)).isTrue(); - assertThat(syncState.isInSync(TARGET_CHAIN_DELTA + 1)).isTrue(); + assertThat(syncState.isInSync(heightDifference - 1)).isFalse(); + assertThat(syncState.isInSync(heightDifference)).isTrue(); + assertThat(syncState.isInSync(heightDifference + 1)).isTrue(); } @Test public void isInSync_inSyncWithTargetInSyncWithBestPeer() { - updateChainState( - syncTargetPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L)); - syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L)); - updateChainState( - otherPeerChainState, OUR_CHAIN_HEAD_NUMBER - 1L, OUR_CHAIN_DIFFICULTY.minus(1L)); - when(ethPeers.bestPeerWithHeightEstimate()).thenReturn(Optional.of(otherPeer)); - doReturn(false).when(otherPeerChainState).chainIsBetterThan(any()); + setupOutOfSyncState(); + advanceLocalChain(TARGET_CHAIN_HEIGHT); + updateChainState(otherPeer.getEthPeer(), TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); + doReturn(Optional.of(otherPeer.getEthPeer())).when(ethPeers).bestPeerWithHeightEstimate(); assertThat(syncState.isInSync()).isTrue(); assertThat(syncState.isInSync(0)).isTrue(); } @Test - public void shouldSwitchToInSyncWhenSyncTargetCleared() { + public void shouldSwitchToInSyncWhenNoBetterPeersAreAvailable() { setupOutOfSyncState(); + otherPeer.disconnect(DisconnectReason.REQUESTED); + syncTargetPeer.disconnect(DisconnectReason.REQUESTED); syncState.clearSyncTarget(); - verify(inSyncListener).onSyncStatusChanged(true); + verify(inSyncListener).onInSyncStatusChange(true); + verify(inSyncListenerExact).onInSyncStatusChange(true); verifyNoMoreInteractions(inSyncListener); + verifyNoMoreInteractions(inSyncListenerExact); } @Test public void shouldBecomeInSyncWhenOurBlockchainCatchesUp() { setupOutOfSyncState(); - when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT); - blockAddedObserver.onBlockAdded( - BlockAddedEvent.createForHeadAdvancement( - new Block( - targetBlockHeader(), - new BlockBody(Collections.emptyList(), Collections.emptyList()))), - blockchain); + // Update to just within the default sync threshold + advanceLocalChain(TARGET_CHAIN_HEIGHT - Synchronizer.DEFAULT_IN_SYNC_TOLERANCE); + // We should register as in-sync with default tolerance, out-of-sync with exact tolerance + assertThat(syncState.isInSync()).isTrue(); + assertThat(syncState.isInSync(0)).isFalse(); + verify(inSyncListener).onInSyncStatusChange(true); + verify(inSyncListenerExact, never()).onInSyncStatusChange(true); + + // Advance one more block + advanceLocalChain(TARGET_CHAIN_HEIGHT - Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1); + // We should register as in-sync with default tolerance, out-of-sync with exact tolerance + assertThat(syncState.isInSync()).isTrue(); + assertThat(syncState.isInSync(0)).isFalse(); + verifyNoMoreInteractions(inSyncListener); + verify(inSyncListenerExact, never()).onInSyncStatusChange(true); + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + // We should register as in-sync assertThat(syncState.isInSync()).isTrue(); - verify(inSyncListener).onSyncStatusChanged(true); + assertThat(syncState.isInSync(0)).isTrue(); + verifyNoMoreInteractions(inSyncListener); + verify(inSyncListenerExact).onInSyncStatusChange(true); + } + + @Test + public void addInSyncListener_whileOutOfSync() { + setupOutOfSyncState(); + + // Add listener + InSyncListener newListener = mock(InSyncListener.class); + syncState.subscribeInSync(newListener); + verify(newListener, never()).onInSyncStatusChange(false); + verify(newListener, never()).onInSyncStatusChange(true); + + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L, + TARGET_DIFFICULTY.plus(10L)); + + final ArgumentCaptor inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(newListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture()); + + final List syncChanges = inSyncEventCaptor.getAllValues(); + assertThat(syncChanges.get(0)).isEqualTo(false); + assertThat(syncChanges.get(1)).isEqualTo(true); + assertThat(syncChanges.get(2)).isEqualTo(false); + } + + @Test + public void addInSyncListener_whileOutOfSync_withDistinctSyncTolerance() { + setupOutOfSyncState(); + + // Add listener + final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE * 2; + InSyncListener newListener = mock(InSyncListener.class); + syncState.subscribeInSync(newListener, syncTolerance); + verify(newListener, never()).onInSyncStatusChange(false); + verify(newListener, never()).onInSyncStatusChange(true); + + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + syncTolerance + 1L, + TARGET_DIFFICULTY.plus(10L)); + + final ArgumentCaptor inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(newListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture()); + + final List syncChanges = inSyncEventCaptor.getAllValues(); + assertThat(syncChanges.get(0)).isEqualTo(false); + assertThat(syncChanges.get(1)).isEqualTo(true); + assertThat(syncChanges.get(2)).isEqualTo(false); + } + + @Test + public void addInSyncListener_whileInSync() { + setupOutOfSyncState(); + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // Add listener + InSyncListener newListener = mock(InSyncListener.class); + syncState.subscribeInSync(newListener); + verify(newListener, never()).onInSyncStatusChange(false); + verify(newListener, never()).onInSyncStatusChange(true); + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L, + TARGET_DIFFICULTY.plus(10L)); + verify(newListener).onInSyncStatusChange(false); + verify(newListener, never()).onInSyncStatusChange(true); + + // Catch up + advanceLocalChain(TARGET_CHAIN_HEIGHT + 1L); + verify(newListener).onInSyncStatusChange(false); + verify(newListener).onInSyncStatusChange(true); + } + + @Test + public void addInSyncListener_whileInSync_withDistinctSyncTolerance() { + final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE * 2; + setupOutOfSyncState(); + + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // Add listener + InSyncListener newListener = mock(InSyncListener.class); + syncState.subscribeInSync(newListener, syncTolerance); + verify(newListener, never()).onInSyncStatusChange(false); + verify(newListener, never()).onInSyncStatusChange(true); + + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + syncTolerance + 1L, + TARGET_DIFFICULTY.plus(10L)); + verify(newListener).onInSyncStatusChange(false); + verify(newListener, never()).onInSyncStatusChange(true); + + // Catch up + advanceLocalChain(TARGET_CHAIN_HEIGHT + 1L); + verify(newListener).onInSyncStatusChange(false); + verify(newListener).onInSyncStatusChange(true); + } + + @Test + public void removeInSyncListener_doesntReceiveSubsequentEvents() { + final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L; + setupOutOfSyncState(); + + // Add listener + InSyncListener newListener = mock(InSyncListener.class); + final long subscriberId = syncState.subscribeInSync(newListener, syncTolerance); + verify(newListener, never()).onInSyncStatusChange(anyBoolean()); + + // Remove listener + syncState.unsubscribeInSync(subscriberId); + + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // We should not register the in-sync event + verify(newListener, never()).onInSyncStatusChange(anyBoolean()); + + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + syncTolerance + 1L, + TARGET_DIFFICULTY.plus(10L)); + + // We should not register the sync event + verify(newListener, never()).onInSyncStatusChange(anyBoolean()); + + // Other listeners should keep running + verify(inSyncListenerExact, times(2)).onInSyncStatusChange(false); + verify(inSyncListenerExact).onInSyncStatusChange(true); + } + + @Test + public void removeInSyncListener_addAdditionalListenerBeforeRemoving() { + final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L; + setupOutOfSyncState(); + + // Add listener + InSyncListener listenerToRemove = mock(InSyncListener.class); + InSyncListener otherListener = mock(InSyncListener.class); + final long subscriberId = syncState.subscribeInSync(listenerToRemove, syncTolerance); + syncState.subscribeInSync(otherListener, syncTolerance); + + // Remove listener + syncState.unsubscribeInSync(subscriberId); + + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // We should not register the in-sync event + verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean()); + + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + syncTolerance + 1L, + TARGET_DIFFICULTY.plus(10L)); + + // We should not register the in-sync event + verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean()); + + final ArgumentCaptor inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(otherListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture()); + + final List syncChanges = inSyncEventCaptor.getAllValues(); + assertThat(syncChanges.get(0)).isEqualTo(false); + assertThat(syncChanges.get(1)).isEqualTo(true); + assertThat(syncChanges.get(2)).isEqualTo(false); + + // Other listeners should keep running + verify(inSyncListenerExact).onInSyncStatusChange(true); + verify(inSyncListenerExact, times(2)).onInSyncStatusChange(false); + } + + @Test + public void removeInSyncListener_addAdditionalListenerAfterRemoving() { + final long syncTolerance = Synchronizer.DEFAULT_IN_SYNC_TOLERANCE + 1L; + setupOutOfSyncState(); + + // Add listener + InSyncListener listenerToRemove = mock(InSyncListener.class); + InSyncListener otherListener = mock(InSyncListener.class); + final long subscriberId = syncState.subscribeInSync(listenerToRemove, syncTolerance); + + // Remove listener + syncState.unsubscribeInSync(subscriberId); + + // Add new listener + syncState.subscribeInSync(otherListener, syncTolerance); + + // Catch all the way up + advanceLocalChain(TARGET_CHAIN_HEIGHT); + + // We should not register the sync event + verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean()); + + // Fall out of sync + updateChainState( + syncTargetPeer.getEthPeer(), + TARGET_CHAIN_HEIGHT + syncTolerance + 1L, + TARGET_DIFFICULTY.plus(10L)); + + // We should not register the sync event + verify(listenerToRemove, never()).onInSyncStatusChange(anyBoolean()); + + final ArgumentCaptor inSyncEventCaptor = ArgumentCaptor.forClass(Boolean.class); + verify(otherListener, times(3)).onInSyncStatusChange(inSyncEventCaptor.capture()); + + final List syncChanges = inSyncEventCaptor.getAllValues(); + assertThat(syncChanges.get(0)).isEqualTo(false); + assertThat(syncChanges.get(1)).isEqualTo(true); + assertThat(syncChanges.get(2)).isEqualTo(false); + + // Other listeners should keep running + verify(inSyncListenerExact, times(2)).onInSyncStatusChange(false); + verify(inSyncListenerExact).onInSyncStatusChange(true); } @Test public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() { final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class); - syncState.addSyncStatusListener(syncStatusListener); + syncState.subscribeSyncStatus(syncStatusListener); - blockAddedObserver.onBlockAdded( - BlockAddedEvent.createForHeadAdvancement( - new Block( - targetBlockHeader(), - new BlockBody(Collections.emptyList(), Collections.emptyList()))), - blockchain); + advanceLocalChain(OUR_CHAIN_HEAD_NUMBER + 1L); verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus())); } @Test - public void shouldReportReorgEvents() { - when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT); - - blockAddedObserver.onBlockAdded( - BlockAddedEvent.createForChainReorg( - new Block( - targetBlockHeader(), - new BlockBody(Collections.emptyList(), Collections.emptyList())), - Collections.emptyList(), - Collections.emptyList()), - blockchain); + public void shouldHandleSyncThenReorg() { + // Sync up to the target + final int expectedSyncEvents = (int) TARGET_CHAIN_DELTA; + advanceLocalChain(TARGET_CHAIN_HEIGHT); + // Perform a shallow reorg + final int expectedReorgEvents = 2; + reorgLocalChain(TARGET_CHAIN_HEIGHT - 1, TARGET_CHAIN_HEIGHT, UInt256.of(2L)); assertThat(syncState.isInSync()).isTrue(); final ArgumentCaptor captor = ArgumentCaptor.forClass(SyncStatus.class); - verify(syncStatusListener, times(2)).onSyncStatusChanged(captor.capture()); - assertThat(captor.getAllValues().get(0).inSync()).isFalse(); - assertThat(captor.getAllValues().get(1).inSync()).isTrue(); + verify(syncStatusListener, times(expectedSyncEvents + expectedReorgEvents)) + .onSyncStatusChanged(captor.capture()); + + final List eventValues = captor.getAllValues(); + + // Check the initial set of events corresponding to block advancement while we're out of sync + for (int i = 0; i < eventValues.size(); i++) { + + final SyncStatus syncStatus = eventValues.get(i); + if (i == eventValues.size() - 1) { + // Last event should be the in-sync reorg event + assertThat(syncStatus.inSync()).isTrue(); + assertThat(syncStatus.getCurrentBlock()).isEqualTo(TARGET_CHAIN_HEIGHT); + // TODO - finalize the start, current, and highest block values should be + } else if (i == eventValues.size() - 2) { + // Second-to-last event should be the in-sync reorg event + assertThat(syncStatus.inSync()).isFalse(); + assertThat(syncStatus.getCurrentBlock()).isEqualTo(TARGET_CHAIN_HEIGHT); + // TODO - finalize the start, current, and highest block values should be + } else if (i == eventValues.size() - 3) { + // Third-to-last event should be the event when the node finally reaches sync + assertThat(syncStatus.inSync()).isTrue(); + assertThat(syncStatus.getCurrentBlock()).isEqualTo(TARGET_CHAIN_HEIGHT); + assertThat(syncStatus.getHighestBlock()).isEqualTo(TARGET_CHAIN_HEIGHT); + // TODO - verify desired startingBlock value + } else { + // All previous events should correspond to the initial sync + assertThat(syncStatus.inSync()).isFalse(); + assertThat(syncStatus.getCurrentBlock()).isEqualTo(OUR_CHAIN_HEAD_NUMBER + i + 1); + assertThat(syncStatus.getHighestBlock()).isEqualTo(TARGET_CHAIN_HEIGHT); + // TODO - verify desired startingBlock value + } + } + } + + private RespondingEthPeer createPeer(final UInt256 totalDifficulty, final long blockHeight) { + return EthProtocolManagerTestUtil.createPeer(ethProtocolManager, totalDifficulty, blockHeight); + } + + private EthPeer mockWorseChain(final EthPeer peer) { + return mockChainIsBetterThan(peer, false); + } + + private EthPeer mockBetterChain(final EthPeer peer) { + return mockChainIsBetterThan(peer, true); + } + + private EthPeer mockChainIsBetterThan(final EthPeer peer, final boolean isBetter) { + final ChainState chainState = spy(peer.chainState()); + final ChainHeadEstimate chainStateSnapshot = spy(peer.chainStateSnapshot()); + doReturn(isBetter).when(chainState).chainIsBetterThan(any()); + doReturn(isBetter).when(chainStateSnapshot).chainIsBetterThan(any()); + final EthPeer mockedPeer = spy(peer); + doReturn(chainStateSnapshot).when(chainState).getSnapshot(); + doReturn(chainStateSnapshot).when(mockedPeer).chainStateSnapshot(); + doReturn(chainState).when(mockedPeer).chainState(); + return mockedPeer; } private void setupOutOfSyncState() { - updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); - syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L)); - assertThat(syncState.isInSync()).isFalse(); - verify(inSyncListener).onSyncStatusChanged(false); + syncState.setSyncTarget(syncTargetPeer.getEthPeer(), blockchain.getGenesisBlock().getHeader()); + verify(inSyncListener).onInSyncStatusChange(false); + verify(inSyncListenerExact).onInSyncStatusChange(false); + } + + private void advanceLocalChain(final long newChainHeight) { + while (blockchain.getChainHeadBlockNumber() < newChainHeight) { + final BlockHeader parent = blockchain.getChainHeadHeader(); + final Block block = + gen.block( + BlockOptions.create() + .setDifficulty(standardDifficultyPerBlock) + .setParentHash(parent.getHash()) + .setBlockNumber(parent.getNumber() + 1L)); + final List receipts = gen.receipts(block); + blockchain.appendBlock(block, receipts); + } + } + + private void reorgLocalChain( + final long commonAncestor, final long newHeight, final UInt256 difficultyPerBlock) { + BlockHeader currentBlock = blockchain.getBlockHeader(commonAncestor).get(); + while (currentBlock.getNumber() < newHeight) { + final Block block = + gen.block( + BlockOptions.create() + .setDifficulty(difficultyPerBlock) + .setParentHash(currentBlock.getHash()) + .setBlockNumber(currentBlock.getNumber() + 1L)); + final List receipts = gen.receipts(block); + blockchain.appendBlock(block, receipts); + currentBlock = block.getHeader(); + } } /** * Updates the chain state, such that the peer will end up with an estimated height of {@code * blockHeight} and an estimated total difficulty of {@code totalDifficulty} * - * @param chainState The chain state to update + * @param peer The peer whose chain should be updated * @param blockHeight The target estimated block height * @param totalDifficulty The total difficulty */ private void updateChainState( - final ChainState chainState, final long blockHeight, final UInt256 totalDifficulty) { + final EthPeer peer, final long blockHeight, final UInt256 totalDifficulty) { // Chain state is updated based on the parent of the announced block // So, increment block number by 1 and set block difficulty to zero // in order to update to the values we want @@ -291,18 +626,10 @@ private void updateChainState( .number(blockHeight + 1L) .difficulty(UInt256.ZERO) .buildHeader(); - chainState.updateForAnnouncedBlock(header, totalDifficulty); + peer.chainState().updateForAnnouncedBlock(header, totalDifficulty); // Sanity check this logic still holds - assertThat(chainState.getEstimatedHeight()).isEqualTo(blockHeight); - assertThat(chainState.getEstimatedTotalDifficulty()).isEqualTo(totalDifficulty); - } - - private BlockHeader targetBlockHeader() { - return blockHeaderAt(TARGET_CHAIN_HEIGHT); - } - - private BlockHeader blockHeaderAt(final long blockNumber) { - return new BlockHeaderTestFixture().number(blockNumber).buildHeader(); + assertThat(peer.chainState().getEstimatedHeight()).isEqualTo(blockHeight); + assertThat(peer.chainState().getEstimatedTotalDifficulty()).isEqualTo(totalDifficulty); } } diff --git a/ethereum/permissioning/src/main/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java b/ethereum/permissioning/src/main/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java index d5db7827af9..ed07c6d550b 100644 --- a/ethereum/permissioning/src/main/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java +++ b/ethereum/permissioning/src/main/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProvider.java @@ -20,14 +20,13 @@ import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.ethereum.permissioning.node.NodePermissioningProvider; import org.hyperledger.besu.metrics.BesuMetricCategory; -import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; import java.net.URI; import java.util.Collection; -import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class SyncStatusNodePermissioningProvider implements NodePermissioningProvider { @@ -37,8 +36,8 @@ public class SyncStatusNodePermissioningProvider implements NodePermissioningPro private final Counter checkCounter; private final Counter checkCounterPermitted; private final Counter checkCounterUnpermitted; - private OptionalLong syncStatusObserverId; - private boolean hasReachedSync = false; + private final long inSyncSubscriberId; + private final AtomicBoolean hasReachedSync = new AtomicBoolean(false); public SyncStatusNodePermissioningProvider( final Synchronizer synchronizer, @@ -46,8 +45,7 @@ public SyncStatusNodePermissioningProvider( final MetricsSystem metricsSystem) { checkNotNull(synchronizer); this.synchronizer = synchronizer; - long id = this.synchronizer.observeSyncStatus(this::handleSyncStatusUpdate); - this.syncStatusObserverId = OptionalLong.of(id); + this.inSyncSubscriberId = this.synchronizer.subscribeInSync(this::handleInSyncEvent, 0); this.fixedNodes = fixedNodes.stream().map(EnodeURL::toURIWithoutDiscoveryPort).collect(Collectors.toSet()); @@ -55,7 +53,7 @@ public SyncStatusNodePermissioningProvider( BesuMetricCategory.PERMISSIONING, "sync_status_node_sync_reached", "Whether the sync status permissioning provider has realised sync yet", - () -> hasReachedSync ? 1 : 0); + () -> hasReachedSync.get() ? 1 : 0); this.checkCounter = metricsSystem.createCounter( BesuMetricCategory.PERMISSIONING, @@ -73,20 +71,10 @@ public SyncStatusNodePermissioningProvider( "Number of times the sync status permissioning provider has been checked and returned unpermitted"); } - private void handleSyncStatusUpdate(final SyncStatus syncStatus) { - if (syncStatus != null) { - long blocksBehind = syncStatus.getHighestBlock() - syncStatus.getCurrentBlock(); - if (blocksBehind <= 0) { - synchronized (this) { - if (!hasReachedSync) { - syncStatusObserverId.ifPresent( - id -> { - synchronizer.removeObserver(id); - syncStatusObserverId = OptionalLong.empty(); - }); - hasReachedSync = true; - } - } + private void handleInSyncEvent(final boolean isInSync) { + if (isInSync) { + if (hasReachedSync.compareAndSet(false, true)) { + synchronizer.unsubscribeInSync(inSyncSubscriberId); } } } @@ -104,7 +92,7 @@ private void handleSyncStatusUpdate(final SyncStatus syncStatus) { */ @Override public boolean isPermitted(final EnodeURL sourceEnode, final EnodeURL destinationEnode) { - if (hasReachedSync) { + if (hasReachedSync.get()) { return true; } else { checkCounter.inc(); @@ -119,6 +107,6 @@ public boolean isPermitted(final EnodeURL sourceEnode, final EnodeURL destinatio } public boolean hasReachedSync() { - return hasReachedSync; + return hasReachedSync.get(); } } diff --git a/ethereum/permissioning/src/test/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java b/ethereum/permissioning/src/test/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java index 10a708584fc..4ccd4e15365 100644 --- a/ethereum/permissioning/src/test/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java +++ b/ethereum/permissioning/src/test/java/org/hyperledger/besu/ethereum/permissioning/node/provider/SyncStatusNodePermissioningProviderTest.java @@ -21,11 +21,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.hyperledger.besu.ethereum.core.SyncStatus; import org.hyperledger.besu.ethereum.core.Synchronizer; +import org.hyperledger.besu.ethereum.core.Synchronizer.InSyncListener; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.metrics.BesuMetricCategory; -import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.metrics.Counter; @@ -62,14 +61,16 @@ public class SyncStatusNodePermissioningProviderTest { @Mock private Synchronizer synchronizer; private Collection bootnodes = new ArrayList<>(); private SyncStatusNodePermissioningProvider provider; - private SyncStatusListener syncStatusListener; - private long syncStatusObserverId = 1L; + private InSyncListener inSyncListener; @Before public void before() { - final ArgumentCaptor captor = - ArgumentCaptor.forClass(SyncStatusListener.class); - when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(syncStatusObserverId); + final ArgumentCaptor inSyncSubscriberCaptor = + ArgumentCaptor.forClass(InSyncListener.class); + final ArgumentCaptor syncToleranceCaptor = ArgumentCaptor.forClass(Long.class); + when(synchronizer.subscribeInSync( + inSyncSubscriberCaptor.capture(), syncToleranceCaptor.capture())) + .thenReturn(1L); bootnodes.add(bootnode); @SuppressWarnings("unchecked") @@ -92,7 +93,8 @@ public void before() { "Number of times the sync status permissioning provider has been checked and returned unpermitted")) .thenReturn(checkUnpermittedCounter); this.provider = new SyncStatusNodePermissioningProvider(synchronizer, bootnodes, metricsSystem); - this.syncStatusListener = captor.getValue(); + this.inSyncListener = inSyncSubscriberCaptor.getValue(); + assertThat(syncToleranceCaptor.getValue()).isEqualTo(0); verify(metricsSystem) .createIntegerGauge( eq(BesuMetricCategory.PERMISSIONING), @@ -101,12 +103,12 @@ public void before() { syncGaugeCallbackCaptor.capture()); this.syncGauge = syncGaugeCallbackCaptor.getValue(); - verify(synchronizer).observeSyncStatus(any()); + verify(synchronizer).subscribeInSync(any(), eq(0L)); } @Test public void whenIsNotInSyncHasReachedSyncShouldReturnFalse() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); + inSyncListener.onInSyncStatusChange(false); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -114,7 +116,7 @@ public void whenIsNotInSyncHasReachedSyncShouldReturnFalse() { @Test public void whenInSyncHasReachedSyncShouldReturnTrue() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 1)); + inSyncListener.onInSyncStatusChange(true); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); @@ -122,22 +124,21 @@ public void whenInSyncHasReachedSyncShouldReturnTrue() { @Test public void whenInSyncChangesFromTrueToFalseHasReachedSyncShouldReturnTrue() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); + inSyncListener.onInSyncStatusChange(false); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 2, 1)); + inSyncListener.onInSyncStatusChange(true); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 2, 3)); + inSyncListener.onInSyncStatusChange(false); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); } @Test public void whenHasNotSyncedNonBootnodeShouldNotBePermitted() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -151,7 +152,6 @@ public void whenHasNotSyncedNonBootnodeShouldNotBePermitted() { @Test public void whenHasNotSyncedBootnodeIncomingConnectionShouldNotBePermitted() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -165,7 +165,48 @@ public void whenHasNotSyncedBootnodeIncomingConnectionShouldNotBePermitted() { @Test public void whenHasNotSyncedBootnodeOutgoingConnectionShouldBePermitted() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); + assertThat(provider.hasReachedSync()).isFalse(); + assertThat(syncGauge.getAsInt()).isEqualTo(0); + + boolean isPermitted = provider.isPermitted(enode1, bootnode); + + assertThat(isPermitted).isTrue(); + verify(checkCounter, times(1)).inc(); + verify(checkPermittedCounter, times(1)).inc(); + verify(checkUnpermittedCounter, times(0)).inc(); + } + + @Test + public void whenOutOfSyncNonBootnodeShouldNotBePermitted() { + inSyncListener.onInSyncStatusChange(false); + assertThat(provider.hasReachedSync()).isFalse(); + assertThat(syncGauge.getAsInt()).isEqualTo(0); + + boolean isPermitted = provider.isPermitted(enode1, enode2); + + assertThat(isPermitted).isFalse(); + verify(checkCounter, times(1)).inc(); + verify(checkPermittedCounter, times(0)).inc(); + verify(checkUnpermittedCounter, times(1)).inc(); + } + + @Test + public void whenOutOfSyncBootnodeIncomingConnectionShouldNotBePermitted() { + inSyncListener.onInSyncStatusChange(false); + assertThat(provider.hasReachedSync()).isFalse(); + assertThat(syncGauge.getAsInt()).isEqualTo(0); + + boolean isPermitted = provider.isPermitted(bootnode, enode1); + + assertThat(isPermitted).isFalse(); + verify(checkCounter, times(1)).inc(); + verify(checkPermittedCounter, times(0)).inc(); + verify(checkUnpermittedCounter, times(1)).inc(); + } + + @Test + public void whenOutOfSyncBootnodeOutgoingConnectionShouldBePermitted() { + inSyncListener.onInSyncStatusChange(false); assertThat(provider.hasReachedSync()).isFalse(); assertThat(syncGauge.getAsInt()).isEqualTo(0); @@ -179,7 +220,7 @@ public void whenHasNotSyncedBootnodeOutgoingConnectionShouldBePermitted() { @Test public void whenHasSyncedIsPermittedShouldReturnTrue() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 1)); + inSyncListener.onInSyncStatusChange(true); assertThat(provider.hasReachedSync()).isTrue(); assertThat(syncGauge.getAsInt()).isEqualTo(1); @@ -193,7 +234,7 @@ public void whenHasSyncedIsPermittedShouldReturnTrue() { @Test public void syncStatusPermissioningCheckShouldIgnoreEnodeURLDiscoveryPort() { - syncStatusListener.onSyncStatusChanged(new SyncStatus(0, 1, 2)); + inSyncListener.onInSyncStatusChange(false); assertThat(provider.hasReachedSync()).isFalse(); final EnodeURL bootnode =