Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
releaseVersion=0.2.1
besuVersion=23.4.1
releaseVersion=0.2.2.2-SNAPSHOT
# this likely will be 24.5.3 when https://github.com/hyperledger/besu/pull/6641 is released
besuVersion=24.5.1
4 changes: 2 additions & 2 deletions gradle/versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencyManagement {
dependency 'tech.pegasys.tools.epchecks:errorprone-checks:1.1.1'

// Besu dependencies
dependency "org.hyperledger.besu:plugin-api:23.7.4-SNAPSHOT-KT"
dependency "org.hyperledger.besu:plugin-api:${besuVersion}"
dependency "org.hyperledger.besu:besu-datatypes:${besuVersion}"
dependency "org.hyperledger.besu.internal:pipeline:${besuVersion}"
dependency "org.hyperledger.besu.internal:rlp:${besuVersion}"
Expand Down Expand Up @@ -86,4 +86,4 @@ dependencyManagement {


}
}
}
48 changes: 43 additions & 5 deletions src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.consensys.fleet.common.peer.PeerNodesManager;
import net.consensys.fleet.common.rpc.client.WebClientWrapper;
import net.consensys.fleet.common.rpc.json.ConvertMapperProvider;
import net.consensys.fleet.common.rpc.server.FleetGetConfigServer;
import net.consensys.fleet.common.rpc.server.PluginRpcMethod;
import net.consensys.fleet.common.trielog.FleetTrieLogService;
import net.consensys.fleet.follower.event.InitialSyncCompletionObserver;
Expand All @@ -37,6 +38,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import com.google.auto.service.AutoService;
import org.hyperledger.besu.plugin.BesuContext;
Expand Down Expand Up @@ -66,6 +69,8 @@ public class FleetPlugin implements BesuPlugin {
private WebClientWrapper webClient;

private FleetModeSynchronizer fleetModeSynchronizer;
private final AtomicLong leaderBlockAddedObserverId = new AtomicLong(-1);
private final AtomicLong followerSyncCompletionListenerId = new AtomicLong(-1);

// TODO Spit logic of besu plugin to leader besu plugin and follower besu plugin
@Override
Expand Down Expand Up @@ -192,10 +197,41 @@ private void createPeerNetworkMaintainer() {
peerNetworkMaintainer.start();
}

@Override
public CompletableFuture<Void> reloadConfiguration() {

LOG.info("Reloading configuration");
LOG.info(FleetOptions.create().toString());

if (leaderBlockAddedObserverId.get() != -1) {
besuContext
.getService(BesuEvents.class)
.ifPresent(
besuEvents -> {
besuEvents.removeBlockAddedListener(leaderBlockAddedObserverId.get());
leaderBlockAddedObserverId.set(-1);
});
}
if (followerSyncCompletionListenerId.get() != -1) {
besuContext
.getService(BesuEvents.class)
.ifPresent(
besuEvents -> {
besuEvents.removeSyncStatusListener(followerSyncCompletionListenerId.get());
followerSyncCompletionListenerId.set(-1);
});
}
// reload clients methods
loadingClientsMethods();
LOG.info("Configuration reloaded");
return CompletableFuture.completedFuture(null);
}

private List<PluginRpcMethod> createServerMethods() {
final List<PluginRpcMethod> methods = new ArrayList<>();
final BlockContextProvider blockContextProvider =
new BlockContextProvider(pluginServiceProvider, new FleetGetBlockClient(webClient));
methods.add(new FleetGetConfigServer(convertMapperProvider));
methods.add(new FleetAddFollowerServer(peerManagers));
methods.add(new FleetGetBlockServer(convertMapperProvider, pluginServiceProvider));
fleetModeSynchronizer =
Expand All @@ -217,26 +253,28 @@ private void loadingClientsMethods() {
switch (CLI_OPTIONS.getNodeRole()) {
case LEADER -> {
/* ********** LEADER ************* */
LOG.debug("Adding blockchain observer");
LOG.info("Adding blockchain observer");
final BlockAddedObserver blockAddedObserver =
new BlockAddedObserver(pluginServiceProvider, new FleetShipNewHeadClient(webClient));
besuContext
.getService(BesuEvents.class)
.ifPresentOrElse(
besuEvents -> {
besuEvents.addBlockAddedListener(blockAddedObserver);
leaderBlockAddedObserverId.set(
besuEvents.addBlockAddedListener(blockAddedObserver));
},
() -> LOG.error("Could not obtain BesuEvents"));
}
case FOLLOWER -> {
/* ********** FOLLOWER ************* */
LOG.debug("Adding sync status observer");
LOG.info("Adding sync status observer");
besuContext
.getService(BesuEvents.class)
.ifPresentOrElse(
besuEvents -> {
besuEvents.addInitialSyncCompletionListener(
new InitialSyncCompletionObserver(() -> fleetModeSynchronizer));
followerSyncCompletionListenerId.set(
besuEvents.addInitialSyncCompletionListener(
new InitialSyncCompletionObserver(() -> fleetModeSynchronizer)));
},
() -> LOG.error("Could not obtain BesuEvents"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright ConsenSys 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package net.consensys.fleet.common.rpc.server;

import net.consensys.fleet.common.config.FleetOptions;
import net.consensys.fleet.common.rpc.json.ConvertMapperProvider;

import org.hyperledger.besu.plugin.services.rpc.PluginRpcRequest;

public class FleetGetConfigServer implements PluginRpcMethod {

private final ConvertMapperProvider convertMapperProvider;

public FleetGetConfigServer(final ConvertMapperProvider convertMapperProvider) {
this.convertMapperProvider = convertMapperProvider;
}

@Override
public String getName() {
return "getConfig";
}

@Override
public Object execute(final PluginRpcRequest rpcRequest) {
return convertMapperProvider.getJsonConverter().valueToTree(FleetOptions.create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

public interface PluginRpcMethod {

String getNamespace();
default String getNamespace() {
return "fleet";
}
;

String getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ public FleetShipNewHeadServer(
this.pluginServiceProvider = pluginServiceProvider;
}

@Override
public String getNamespace() {
return "fleet";
}

@Override
public String getName() {
return "shipNewHead";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void startSync() {

final TrieLogProvider trieLogProvider =
pluginServiceProvider.getService(TrieLogService.class).getTrieLogProvider();
BlockHeader chainHead = blockchainService.getChainHead();
BlockHeader chainHead = blockchainService.getChainHeadHeader();
try {
do {

Expand Down Expand Up @@ -249,7 +249,7 @@ private void startSync() {
final BlockContext oldHead =
getLocalBlockContext(chainHead.getNumber()).orElseThrow();
// update chain head
chainHead = blockchainService.getChainHead();
chainHead = blockchainService.getChainHeadHeader();
final BlockContext newHead =
getLocalBlockContext(chainHead.getNumber()).orElseThrow();

Expand Down Expand Up @@ -320,9 +320,9 @@ private void logImportedBlockInfo(final BlockHeader header, final BlockBody body
message.append(" / %d ws");
messageArgs.add(body.getWithdrawals().get().size());
}
if (body.getDeposits().isPresent()) {
if (body.getRequests().isPresent()) {
message.append(" / %d ds");
messageArgs.add(body.getDeposits().get().size());
messageArgs.add(body.getRequests().get().size());
}
message.append(" / base fee %s / %,d (%01.1f%%) gas / (%s)");
messageArgs.addAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.hyperledger.besu.plugin.data.AddedBlockContext;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.BlockchainService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockAddedObserver implements BesuEvents.BlockAddedListener {

private final PluginServiceProvider pluginServiceProvider;
private final FleetShipNewHeadClient stateShipNewHeadSender;
private static final Logger LOG = LoggerFactory.getLogger(BlockAddedObserver.class);

public BlockAddedObserver(
final PluginServiceProvider pluginServiceProvider,
Expand All @@ -37,6 +40,10 @@ public BlockAddedObserver(

@Override
public void onBlockAdded(final AddedBlockContext addedBlockContext) {
LOG.atDebug()
.setMessage("New block added: {}")
.addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash())
.log();
if (pluginServiceProvider.isServiceAvailable(BlockchainService.class)) {
final BlockchainService service = pluginServiceProvider.getService(BlockchainService.class);
final Hash safeBlock =
Expand All @@ -45,6 +52,8 @@ public void onBlockAdded(final AddedBlockContext addedBlockContext) {
service.getFinalizedBlock().orElse(addedBlockContext.getBlockHeader().getBlockHash());
stateShipNewHeadSender.sendData(
new NewHeadParams(addedBlockContext.getBlockHeader(), safeBlock, finalizedBlock));
} else {
LOG.error("BlockchainService is not available");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.util.concurrent.CompletableFuture;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FleetShipNewHeadClient extends AbstractStateRpcSender<NewHeadParams, Boolean> {

private static final String METHOD_NAME = "fleet_shipNewHead";
private static final Logger LOG = LoggerFactory.getLogger(FleetShipNewHeadClient.class);

public FleetShipNewHeadClient(final WebClientWrapper webClient) {
super(webClient);
Expand All @@ -37,11 +40,13 @@ protected String getMethodeName() {

@Override
public CompletableFuture<Boolean> sendData(final NewHeadParams data) {
LOG.info("Sending new head to followers");
final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
try {
webClient.sendToFollowers(ENDPOINT, getMethodeName(), data);
completableFuture.complete(true);
} catch (JsonProcessingException e) {
LOG.error("Error sending new head to followers", e);
completableFuture.complete(false);
}
return completableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ public FleetAddFollowerServer(final PeerNodesManager followerNodesManager) {
this.followerNodesManager = followerNodesManager;
}

@Override
public String getNamespace() {
return "fleet";
}

@Override
public String getName() {
return "addFollowerNode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ public FleetGetBlockServer(
this.pluginServiceProvider = pluginServiceProvider;
}

@Override
public String getNamespace() {
return "fleet";
}

@Override
public String getName() {
return "getBlock";
Expand Down