Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ A migration will be performed when starting v1.4 for the first time to reprocess
and re-create the private state data in the v1.4 format.
If you have existing private transactions, see [migration details](docs/Private-Txns-Migration.md).

### Additions and Improvements

- Automatic Transaction Log Bloom Filter Caching

Add a new option `--auto-logs-bloom-indexing-enabled` which defaults to true. This performs the equivalent of the `operator generate-log-bloom-cache` CLI task or `admin_generateLogBloomCache` RPC call for each block as it arrives, in addition to caching older logs on first startup.

## 1.4.0 RC-1

### Additions and Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ public void startNode(final BesuNode node) {
params.add("--key-value-storage");
params.add("rocksdb");

params.add("--auto-log-bloom-caching-enabled");
params.add("false");

LOG.info("Creating besu process with params {}", params);
final ProcessBuilder processBuilder =
new ProcessBuilder(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void startNode(final BesuNode node) {
.map(EnodeURL::fromString)
.collect(Collectors.toList()))
.besuPluginContext(new BesuPluginContextImpl())
.autoLogBloomCaching(false)
.build();

runner.start();
Expand Down
15 changes: 13 additions & 2 deletions besu/src/main/java/org/hyperledger/besu/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogBloomCachingService;
import org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.stratum.StratumServer;
Expand Down Expand Up @@ -58,6 +61,8 @@ public class Runner implements AutoCloseable {
private final BesuController<?> besuController;
private final Path dataDir;
private final Optional<StratumServer> stratumServer;
private final Optional<AutoTransactionLogBloomCachingService>
autoTransactionLogBloomCachingService;

Runner(
final Vertx vertx,
Expand All @@ -69,7 +74,9 @@ public class Runner implements AutoCloseable {
final Optional<StratumServer> stratumServer,
final Optional<MetricsService> metrics,
final BesuController<?> besuController,
final Path dataDir) {
final Path dataDir,
final Optional<TransactionLogBloomCacher> transactionLogBloomCacher,
final Blockchain blockchain) {
this.vertx = vertx;
this.networkRunner = networkRunner;
this.natService = natService;
Expand All @@ -80,6 +87,9 @@ public class Runner implements AutoCloseable {
this.besuController = besuController;
this.dataDir = dataDir;
this.stratumServer = stratumServer;
this.autoTransactionLogBloomCachingService =
transactionLogBloomCacher.map(
cacher -> new AutoTransactionLogBloomCachingService(blockchain, cacher));
}

public void start() {
Expand All @@ -103,6 +113,7 @@ public void start() {
LOG.info("Ethereum main loop is up.");
writeBesuPortsToFile();
writeBesuNetworksToFile();
autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::start);
} catch (final Exception ex) {
LOG.error("Startup failed", ex);
throw new IllegalStateException(ex);
Expand All @@ -125,7 +136,7 @@ public void stop() {

networkRunner.stop();
waitForServiceToStop("Network", networkRunner::awaitStop);

autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::stop);
natService.stop();
besuController.close();
vertx.close((res) -> vertxShutdownLatch.countDown());
Expand Down
10 changes: 9 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class RunnerBuilder {
private Collection<EnodeURL> staticNodes = Collections.emptyList();
private Optional<String> identityString = Optional.empty();
private BesuPluginContextImpl besuPluginContext;
private boolean autoLogBloomCaching = true;

public RunnerBuilder vertx(final Vertx vertx) {
this.vertx = vertx;
Expand Down Expand Up @@ -270,6 +271,11 @@ public RunnerBuilder besuPluginContext(final BesuPluginContextImpl besuPluginCon
return this;
}

public RunnerBuilder autoLogBloomCaching(final boolean autoLogBloomCaching) {
this.autoLogBloomCaching = autoLogBloomCaching;
return this;
}

public Runner build() {

Preconditions.checkNotNull(besuController);
Expand Down Expand Up @@ -527,7 +533,9 @@ public Runner build() {
stratumServer,
metricsService,
besuController,
dataDir);
dataDir,
autoLogBloomCaching ? blockchainQueries.getTransactionLogBloomCacher() : Optional.empty(),
context.getBlockchain());
}

private Optional<NodePermissioningController> buildNodePermissioningController(
Expand Down
7 changes: 7 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,12 @@ void setBannedNodeIds(final List<String> values) {
arity = "1")
private String keyValueStorageName = DEFAULT_KEY_VALUE_STORAGE_NAME;

@Option(
names = {"--auto-log-bloom-caching-enabled"},
description = "Enable automatic log bloom caching (default: ${DEFAULT-VALUE})",
arity = "1")
private final Boolean autoLogBloomCachingEnabled = true;

@Option(
names = {"--override-genesis-config"},
paramLabel = "NAME=VALUE",
Expand Down Expand Up @@ -1717,6 +1723,7 @@ private void synchronize(
.staticNodes(staticNodes)
.identityString(identityString)
.besuPluginContext(besuPluginContext)
.autoLogBloomCaching(autoLogBloomCachingEnabled)
.build();

addShutdownHook(runner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.hyperledger.besu.cli.DefaultCommandValues.MANDATORY_LONG_FORMAT_HELP;
import static org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer.BLOCKS_PER_BLOOM_CACHE;
import static org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.BLOCKS_PER_BLOOM_CACHE;

import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer;
import org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
Expand All @@ -43,7 +43,7 @@ public class GenerateLogBloomCache implements Runnable {
names = "--start-block",
paramLabel = MANDATORY_LONG_FORMAT_HELP,
description =
"The block to start generating indexes. Must be an increment of "
"The block to start generating the cache. Must be an increment of "
+ BLOCKS_PER_BLOOM_CACHE
+ " (default: ${DEFAULT-VALUE})",
arity = "1..1")
Expand All @@ -52,7 +52,7 @@ public class GenerateLogBloomCache implements Runnable {
@Option(
names = "--end-block",
paramLabel = MANDATORY_LONG_FORMAT_HELP,
description = "The block to stop generating indexes (default is last block of the chain).",
description = "The block to stop generating the cache (default is last block of the chain).",
arity = "1..1")
private final Long endBlock = Long.MAX_VALUE;

Expand All @@ -69,9 +69,9 @@ public void run() {
final EthScheduler scheduler = new EthScheduler(1, 1, 1, 1, new NoOpMetricsSystem());
try {
final long finalBlock = Math.min(blockchain.getChainHeadBlockNumber(), endBlock);
final TransactionLogsIndexer indexer =
new TransactionLogsIndexer(blockchain, cacheDir, scheduler);
indexer.generateLogBloomCache(startBlock, finalBlock);
final TransactionLogBloomCacher cacher =
new TransactionLogBloomCacher(blockchain, cacheDir, scheduler);
cacher.generateLogBloomCache(startBlock, finalBlock);
} finally {
scheduler.stop();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void callingBesuCommandWithoutOptionsMustSyncWithDefaultValues() throws E
verify(mockRunnerBuilder).webSocketConfiguration(eq(DEFAULT_WEB_SOCKET_CONFIGURATION));
verify(mockRunnerBuilder).metricsConfiguration(eq(DEFAULT_METRICS_CONFIGURATION));
verify(mockRunnerBuilder).ethNetworkConfig(ethNetworkArg.capture());
verify(mockRunnerBuilder).autoLogBloomCaching(eq(true));
verify(mockRunnerBuilder).build();

verify(mockControllerBuilderFactory).fromEthNetworkConfig(ethNetworkArg.capture(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public void initMocks() throws Exception {
when(mockRunnerBuilder.staticNodes(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.identityString(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.besuPluginContext(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.autoLogBloomCaching(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.build()).thenReturn(mockRunner);

lenient()
Expand Down
5 changes: 4 additions & 1 deletion besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,7 @@ revert-reason-enabled=false
key-value-storage="rocksdb"

# Gas limit
target-gas-limit=8000000
target-gas-limit=8000000

# transaction log bloom filter caching
auto-log-bloom-caching-enabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public JsonRpcResponse response(final JsonRpcRequestContext requestContext) {
return new JsonRpcSuccessResponse(
requestContext.getRequest().getId(),
blockchainQueries
.getTransactionLogsIndexer()
.map(indexer -> indexer.requestIndexing(startBlock, stopBlock))
.getTransactionLogBloomCacher()
.map(cacher -> cacher.requestCaching(startBlock, stopBlock))
.orElse(null));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.query;

import org.hyperledger.besu.ethereum.chain.Blockchain;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Optional;
import java.util.OptionalLong;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AutoTransactionLogBloomCachingService {
protected static final Logger LOG = LogManager.getLogger();
private final Blockchain blockchain;
private final TransactionLogBloomCacher transactionLogBloomCacher;
private OptionalLong blockAddedSubscriptionId = OptionalLong.empty();
private OptionalLong chainReorgSubscriptionId = OptionalLong.empty();

public AutoTransactionLogBloomCachingService(
final Blockchain blockchain, final TransactionLogBloomCacher transactionLogBloomCacher) {
this.blockchain = blockchain;
this.transactionLogBloomCacher = transactionLogBloomCacher;
}

public void start() {
try {
LOG.info("Starting auto transaction log bloom caching service.");
final Path cacheDir = transactionLogBloomCacher.getCacheDir();
if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) {
Files.createDirectory(cacheDir);
}
blockAddedSubscriptionId =
OptionalLong.of(
blockchain.observeBlockAdded(
(event, __) -> {
if (event.isNewCanonicalHead()) {
transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
event.getBlock().getHeader(), Optional.empty(), true);
}
}));
chainReorgSubscriptionId =
OptionalLong.of(
blockchain.observeChainReorg(
(header, __) ->
transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
header, Optional.empty(), true)));

transactionLogBloomCacher
.getScheduler()
.scheduleFutureTask(transactionLogBloomCacher::cacheAll, Duration.ofMinutes(1));
} catch (IOException e) {
LOG.error("Unhandled caching exception.", e);
}
}

public void stop() {
LOG.info("Shutting down Auto transaction logs caching service.");
blockAddedSubscriptionId.ifPresent(blockchain::removeObserver);
chainReorgSubscriptionId.ifPresent(blockchain::removeChainReorgObserver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.api.query;

import static com.google.common.base.Preconditions.checkArgument;
import static org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer.BLOCKS_PER_BLOOM_CACHE;
import static org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.BLOCKS_PER_BLOOM_CACHE;

import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.chain.TransactionLocation;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class BlockchainQueries {
private final WorldStateArchive worldStateArchive;
private final Blockchain blockchain;
private final Optional<Path> cachePath;
private final Optional<TransactionLogsIndexer> transactionLogsIndexer;
private final Optional<TransactionLogBloomCacher> transactionLogBloomCacher;

public BlockchainQueries(final Blockchain blockchain, final WorldStateArchive worldStateArchive) {
this(blockchain, worldStateArchive, Optional.empty(), Optional.empty());
Expand All @@ -82,9 +82,10 @@ public BlockchainQueries(
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.cachePath = cachePath;
this.transactionLogsIndexer =
this.transactionLogBloomCacher =
(cachePath.isPresent() && scheduler.isPresent())
? Optional.of(new TransactionLogsIndexer(blockchain, cachePath.get(), scheduler.get()))
? Optional.of(
new TransactionLogBloomCacher(blockchain, cachePath.get(), scheduler.get()))
: Optional.empty();
}

Expand All @@ -96,8 +97,8 @@ public WorldStateArchive getWorldStateArchive() {
return worldStateArchive;
}

public Optional<TransactionLogsIndexer> getTransactionLogsIndexer() {
return transactionLogsIndexer;
public Optional<TransactionLogBloomCacher> getTransactionLogBloomCacher() {
return transactionLogBloomCacher;
}

/**
Expand Down
Loading