Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public void removeTransactionDroppedListener(final long listenerIdentifier) {

@Override
public long addSyncStatusListener(final SyncStatusListener syncStatusListener) {
return syncState.addSyncStatusListener(syncStatusListener);
return syncState.subscribeSyncStatus(syncStatusListener);
}

@Override
public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.removeSyncStatusListener(listenerIdentifier);
syncState.unsubscribeSyncStatus(listenerIdentifier);
}

private static PropagatedBlockContext blockPropagatedContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class SyncingSubscriptionService {
public SyncingSubscriptionService(
final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) {
this.subscriptionManager = subscriptionManager;
synchronizer.observeSyncStatus(this::sendSyncingToMatchingSubscriptions);
synchronizer.subscribeSyncStatus(this::sendSyncingToMatchingSubscriptions);
}

private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class SyncingSubscriptionServiceTest {
public void before() {
final ArgumentCaptor<SyncStatusListener> captor =
ArgumentCaptor.forClass(SyncStatusListener.class);
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L);
when(synchronizer.subscribeSyncStatus(captor.capture())).thenReturn(1L);
new SyncingSubscriptionService(subscriptionManager, synchronizer);
syncStatusListener = captor.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public AbstractMiningCoordinator(
this.blockchain = blockchain;
this.syncState = syncState;
this.blockchain.observeBlockAdded(this);
syncState.addInSyncListener(this::inSyncChanged);
syncState.subscribeInSync(this::inSyncChanged);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
/** Provides an interface to block synchronization processes. */
public interface Synchronizer {

// Default tolerance used to determine whether or not this node is "in sync"
long DEFAULT_IN_SYNC_TOLERANCE = 5;

void start();

void stop();
Expand All @@ -32,7 +35,43 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();

long observeSyncStatus(final BesuEvents.SyncStatusListener listener);
long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener);

boolean unsubscribeSyncStatus(long observerId);

/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code DEFAULT_IN_SYNC_TOLERANCE} behind the
* highest estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @return A subscription id that can be used to unsubscribe from these events
*/
long subscribeInSync(final InSyncListener listener);

/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code syncTolerance} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of
* zero means that the node is considered in-sync only when the local chain height is greater
* than or equal to the best estimated remote chain height.
* @return A subscription id that can be used to unsubscribe from these events
*/
long subscribeInSync(final InSyncListener listener, final long syncTolerance);

/**
* Unsubscribe from in sync events.
*
* @param listenerId The id returned when subscribing
* @return {@code true} if a subscription was cancelled
*/
boolean unsubscribeInSync(final long listenerId);

boolean removeObserver(long observerId);
@FunctionalInterface
interface InSyncListener {
void onInSyncStatusChange(boolean newSyncStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,14 @@ public List<Block> blockSequence(
}

public Block genesisBlock() {
final BlockOptions options =
new BlockOptions()
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return genesisBlock(new BlockOptions());
}

public Block genesisBlock(final BlockOptions options) {
options
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return block(options);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.util.uint.UInt256;

public interface ChainHeadEstimate {

UInt256 getEstimatedTotalDifficulty();

long getEstimatedHeight();

/**
* Returns true if this chain state represents a chain that is "better" than the chain represented
* by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or
* heavier than the supplied {@code chainToCheck}.
*
* @param chainToCheck The chain being compared.
* @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}.
*/
default boolean chainIsBetterThan(final ChainHead chainToCheck) {
return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck);
}

default boolean hasHigherDifficultyThan(final ChainHead chainToCheck) {
return getEstimatedTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0;
}

default boolean hasLongerChainThan(final ChainHead chainToCheck) {
return getEstimatedHeight() > chainToCheck.getHeight();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.util.Subscribers;
import org.hyperledger.besu.util.uint.UInt256;

import com.google.common.base.MoreObjects;

public class ChainState {
public class ChainState implements ChainHeadEstimate {
// The best block by total difficulty that we know about
private final BestBlock bestBlock = new BestBlock();
// The highest block that we've seen
Expand All @@ -40,14 +39,20 @@ public void removeEstimatedHeightListener(final long listenerId) {
estimatedHeightListeners.unsubscribe(listenerId);
}

public ChainStateSnapshot getSnapshot() {
return new ChainStateSnapshot(getEstimatedTotalDifficulty(), getEstimatedHeight());
}

public boolean hasEstimatedHeight() {
return estimatedHeightKnown;
}

@Override
public long getEstimatedHeight() {
return estimatedHeight;
}

@Override
public UInt256 getEstimatedTotalDifficulty() {
return bestBlock.getTotalDifficulty();
}
Expand Down Expand Up @@ -106,26 +111,6 @@ public void updateHeightEstimate(final long blockNumber) {
}
}

/**
* Returns true if this chain state represents a chain that is "better" than the chain represented
* by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or
* heavier than the supplied {@code chainToCheck}.
*
* @param chainToCheck The chain being compared.
* @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}.
*/
public boolean chainIsBetterThan(final ChainHead chainToCheck) {
return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck);
}

private boolean hasHigherDifficultyThan(final ChainHead chainToCheck) {
return bestBlock.getTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0;
}

private boolean hasLongerChainThan(final ChainHead chainToCheck) {
return estimatedHeight > chainToCheck.getHeight();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.util.uint.UInt256;

public class ChainStateSnapshot implements ChainHeadEstimate {
private final UInt256 totalDifficulty;
private final long chainHeight;

public ChainStateSnapshot(final UInt256 totalDifficulty, final long chainHeight) {
this.totalDifficulty = totalDifficulty;
this.chainHeight = chainHeight;
}

@Override
public UInt256 getEstimatedTotalDifficulty() {
return totalDifficulty;
}

@Override
public long getEstimatedHeight() {
return chainHeight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,16 @@ public boolean hasSeenBlock(final Hash hash) {
return knownBlocks.contains(hash);
}

/** @return This peer's current chain state. */
public ChainState chainState() {
return chainHeadState;
}

/** @return A read-only snapshot of this peer's current {@code chainState} } */
public ChainHeadEstimate chainStateSnapshot() {
return chainHeadState.getSnapshot();
}

public void registerHeight(final Hash blockHash, final long height) {
chainHeadState.update(blockHash, height);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;
import org.hyperledger.besu.util.Subscribers;

import java.nio.file.Path;
import java.time.Clock;
Expand All @@ -51,7 +50,6 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final Optional<Pruner> maybePruner;
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private final BlockPropagationManager<C> blockPropagationManager;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;
private final FullSyncDownloader<C> fullSyncDownloader;
Expand Down Expand Up @@ -126,7 +124,6 @@ private TrailingPeerRequirements calculateTrailingPeerRequirements() {
public void start() {
if (running.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
syncState.addSyncStatusListener(this::syncStatusCallback);
blockPropagationManager.start();
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
Expand Down Expand Up @@ -187,17 +184,28 @@ public Optional<SyncStatus> getSyncStatus() {
}

@Override
public long observeSyncStatus(final SyncStatusListener listener) {
public long subscribeSyncStatus(final SyncStatusListener listener) {
checkNotNull(listener);
return syncStatusListeners.subscribe(listener);
return syncState.subscribeSyncStatus(listener);
}

@Override
public boolean removeObserver(final long observerId) {
return syncStatusListeners.unsubscribe(observerId);
public boolean unsubscribeSyncStatus(final long subscriberId) {
return syncState.unsubscribeSyncStatus(subscriberId);
}

private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status));
@Override
public long subscribeInSync(final InSyncListener listener) {
return syncState.subscribeInSync(listener);
}

@Override
public long subscribeInSync(final InSyncListener listener, final long syncTolerance) {
return syncState.subscribeInSync(listener, syncTolerance);
}

@Override
public boolean unsubscribeInSync(final long listenerId) {
return syncState.unsubscribeSyncStatus(listenerId);
}
}
Loading