diff --git a/CHANGELOG.md b/CHANGELOG.md index c87bd5b03ac..48d834464fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ A migration will be performed when starting v1.4 for the first time to reprocess and re-create the private state data in the v1.4 format. If you have existing private transactions, see [migration details](docs/Private-Txns-Migration.md). +### Additions and Improvements + +- Automatic Transaction Log Bloom Filter Caching + +Add a new option `--auto-logs-bloom-indexing-enabled` which defaults to true. This performs the equivalent of the `operator generate-log-bloom-cache` CLI task or `admin_generateLogBloomCache` RPC call for each block as it arrives, in addition to caching older logs on first startup. + ## 1.4.0 RC-1 ### Additions and Improvements diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java index ae27c30e130..73bc5c43ff0 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java @@ -257,6 +257,9 @@ public void startNode(final BesuNode node) { params.add("--key-value-storage"); params.add("rocksdb"); + params.add("--auto-log-bloom-caching-enabled"); + params.add("false"); + LOG.info("Creating besu process with params {}", params); final ProcessBuilder processBuilder = new ProcessBuilder(params) diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java index 68e0d3cbeab..a73779aee6f 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java @@ -195,6 +195,7 @@ public void startNode(final BesuNode node) { .map(EnodeURL::fromString) .collect(Collectors.toList())) .besuPluginContext(new BesuPluginContextImpl()) + .autoLogBloomCaching(false) .build(); runner.start(); diff --git a/besu/src/main/java/org/hyperledger/besu/Runner.java b/besu/src/main/java/org/hyperledger/besu/Runner.java index 08d8880bf7e..f298611ad7c 100644 --- a/besu/src/main/java/org/hyperledger/besu/Runner.java +++ b/besu/src/main/java/org/hyperledger/besu/Runner.java @@ -18,6 +18,9 @@ import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService; import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService; +import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogBloomCachingService; +import org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.ethereum.stratum.StratumServer; @@ -58,6 +61,8 @@ public class Runner implements AutoCloseable { private final BesuController besuController; private final Path dataDir; private final Optional stratumServer; + private final Optional + autoTransactionLogBloomCachingService; Runner( final Vertx vertx, @@ -69,7 +74,9 @@ public class Runner implements AutoCloseable { final Optional stratumServer, final Optional metrics, final BesuController besuController, - final Path dataDir) { + final Path dataDir, + final Optional transactionLogBloomCacher, + final Blockchain blockchain) { this.vertx = vertx; this.networkRunner = networkRunner; this.natService = natService; @@ -80,6 +87,9 @@ public class Runner implements AutoCloseable { this.besuController = besuController; this.dataDir = dataDir; this.stratumServer = stratumServer; + this.autoTransactionLogBloomCachingService = + transactionLogBloomCacher.map( + cacher -> new AutoTransactionLogBloomCachingService(blockchain, cacher)); } public void start() { @@ -103,6 +113,7 @@ public void start() { LOG.info("Ethereum main loop is up."); writeBesuPortsToFile(); writeBesuNetworksToFile(); + autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::start); } catch (final Exception ex) { LOG.error("Startup failed", ex); throw new IllegalStateException(ex); @@ -125,7 +136,7 @@ public void stop() { networkRunner.stop(); waitForServiceToStop("Network", networkRunner::awaitStop); - + autoTransactionLogBloomCachingService.ifPresent(AutoTransactionLogBloomCachingService::stop); natService.stop(); besuController.close(); vertx.close((res) -> vertxShutdownLatch.countDown()); diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index ff65476fde9..243da5cc07e 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -144,6 +144,7 @@ public class RunnerBuilder { private Collection staticNodes = Collections.emptyList(); private Optional identityString = Optional.empty(); private BesuPluginContextImpl besuPluginContext; + private boolean autoLogBloomCaching = true; public RunnerBuilder vertx(final Vertx vertx) { this.vertx = vertx; @@ -270,6 +271,11 @@ public RunnerBuilder besuPluginContext(final BesuPluginContextImpl besuPluginCon return this; } + public RunnerBuilder autoLogBloomCaching(final boolean autoLogBloomCaching) { + this.autoLogBloomCaching = autoLogBloomCaching; + return this; + } + public Runner build() { Preconditions.checkNotNull(besuController); @@ -527,7 +533,9 @@ public Runner build() { stratumServer, metricsService, besuController, - dataDir); + dataDir, + autoLogBloomCaching ? blockchainQueries.getTransactionLogBloomCacher() : Optional.empty(), + context.getBlockchain()); } private Optional buildNodePermissioningController( diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index cc0297a6424..784d4b90d35 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -800,6 +800,12 @@ void setBannedNodeIds(final List values) { arity = "1") private String keyValueStorageName = DEFAULT_KEY_VALUE_STORAGE_NAME; + @Option( + names = {"--auto-log-bloom-caching-enabled"}, + description = "Enable automatic log bloom caching (default: ${DEFAULT-VALUE})", + arity = "1") + private final Boolean autoLogBloomCachingEnabled = true; + @Option( names = {"--override-genesis-config"}, paramLabel = "NAME=VALUE", @@ -1717,6 +1723,7 @@ private void synchronize( .staticNodes(staticNodes) .identityString(identityString) .besuPluginContext(besuPluginContext) + .autoLogBloomCaching(autoLogBloomCachingEnabled) .build(); addShutdownHook(runner); diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/GenerateLogBloomCache.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/GenerateLogBloomCache.java index aeb089fe150..f2f6e4af69b 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/GenerateLogBloomCache.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/operator/GenerateLogBloomCache.java @@ -19,10 +19,10 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.hyperledger.besu.cli.DefaultCommandValues.MANDATORY_LONG_FORMAT_HELP; -import static org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer.BLOCKS_PER_BLOOM_CACHE; +import static org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.BLOCKS_PER_BLOOM_CACHE; import org.hyperledger.besu.controller.BesuController; -import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer; +import org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; @@ -43,7 +43,7 @@ public class GenerateLogBloomCache implements Runnable { names = "--start-block", paramLabel = MANDATORY_LONG_FORMAT_HELP, description = - "The block to start generating indexes. Must be an increment of " + "The block to start generating the cache. Must be an increment of " + BLOCKS_PER_BLOOM_CACHE + " (default: ${DEFAULT-VALUE})", arity = "1..1") @@ -52,7 +52,7 @@ public class GenerateLogBloomCache implements Runnable { @Option( names = "--end-block", paramLabel = MANDATORY_LONG_FORMAT_HELP, - description = "The block to stop generating indexes (default is last block of the chain).", + description = "The block to stop generating the cache (default is last block of the chain).", arity = "1..1") private final Long endBlock = Long.MAX_VALUE; @@ -69,9 +69,9 @@ public void run() { final EthScheduler scheduler = new EthScheduler(1, 1, 1, 1, new NoOpMetricsSystem()); try { final long finalBlock = Math.min(blockchain.getChainHeadBlockNumber(), endBlock); - final TransactionLogsIndexer indexer = - new TransactionLogsIndexer(blockchain, cacheDir, scheduler); - indexer.generateLogBloomCache(startBlock, finalBlock); + final TransactionLogBloomCacher cacher = + new TransactionLogBloomCacher(blockchain, cacheDir, scheduler); + cacher.generateLogBloomCache(startBlock, finalBlock); } finally { scheduler.stop(); try { diff --git a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java index 07c94415aeb..b65bcfd4185 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java @@ -175,6 +175,7 @@ public void callingBesuCommandWithoutOptionsMustSyncWithDefaultValues() throws E verify(mockRunnerBuilder).webSocketConfiguration(eq(DEFAULT_WEB_SOCKET_CONFIGURATION)); verify(mockRunnerBuilder).metricsConfiguration(eq(DEFAULT_METRICS_CONFIGURATION)); verify(mockRunnerBuilder).ethNetworkConfig(ethNetworkArg.capture()); + verify(mockRunnerBuilder).autoLogBloomCaching(eq(true)); verify(mockRunnerBuilder).build(); verify(mockControllerBuilderFactory).fromEthNetworkConfig(ethNetworkArg.capture(), any()); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java index 812ac3f034f..36678d287f7 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java @@ -223,6 +223,7 @@ public void initMocks() throws Exception { when(mockRunnerBuilder.staticNodes(any())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.identityString(any())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.besuPluginContext(any())).thenReturn(mockRunnerBuilder); + when(mockRunnerBuilder.autoLogBloomCaching(anyBoolean())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.build()).thenReturn(mockRunner); lenient() diff --git a/besu/src/test/resources/everything_config.toml b/besu/src/test/resources/everything_config.toml index 3a9779c2aef..f776ca43a80 100644 --- a/besu/src/test/resources/everything_config.toml +++ b/besu/src/test/resources/everything_config.toml @@ -135,4 +135,7 @@ revert-reason-enabled=false key-value-storage="rocksdb" # Gas limit -target-gas-limit=8000000 \ No newline at end of file +target-gas-limit=8000000 + +# transaction log bloom filter caching +auto-log-bloom-caching-enabled=true \ No newline at end of file diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCache.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCache.java index 24d6bdab150..3cb5e77127d 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCache.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCache.java @@ -73,8 +73,8 @@ public JsonRpcResponse response(final JsonRpcRequestContext requestContext) { return new JsonRpcSuccessResponse( requestContext.getRequest().getId(), blockchainQueries - .getTransactionLogsIndexer() - .map(indexer -> indexer.requestIndexing(startBlock, stopBlock)) + .getTransactionLogBloomCacher() + .map(cacher -> cacher.requestCaching(startBlock, stopBlock)) .orElse(null)); } } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogBloomCachingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogBloomCachingService.java new file mode 100644 index 00000000000..a500720a32d --- /dev/null +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogBloomCachingService.java @@ -0,0 +1,78 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.api.query; + +import org.hyperledger.besu.ethereum.chain.Blockchain; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class AutoTransactionLogBloomCachingService { + protected static final Logger LOG = LogManager.getLogger(); + private final Blockchain blockchain; + private final TransactionLogBloomCacher transactionLogBloomCacher; + private OptionalLong blockAddedSubscriptionId = OptionalLong.empty(); + private OptionalLong chainReorgSubscriptionId = OptionalLong.empty(); + + public AutoTransactionLogBloomCachingService( + final Blockchain blockchain, final TransactionLogBloomCacher transactionLogBloomCacher) { + this.blockchain = blockchain; + this.transactionLogBloomCacher = transactionLogBloomCacher; + } + + public void start() { + try { + LOG.info("Starting auto transaction log bloom caching service."); + final Path cacheDir = transactionLogBloomCacher.getCacheDir(); + if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) { + Files.createDirectory(cacheDir); + } + blockAddedSubscriptionId = + OptionalLong.of( + blockchain.observeBlockAdded( + (event, __) -> { + if (event.isNewCanonicalHead()) { + transactionLogBloomCacher.cacheLogsBloomForBlockHeader( + event.getBlock().getHeader(), Optional.empty(), true); + } + })); + chainReorgSubscriptionId = + OptionalLong.of( + blockchain.observeChainReorg( + (header, __) -> + transactionLogBloomCacher.cacheLogsBloomForBlockHeader( + header, Optional.empty(), true))); + + transactionLogBloomCacher + .getScheduler() + .scheduleFutureTask(transactionLogBloomCacher::cacheAll, Duration.ofMinutes(1)); + } catch (IOException e) { + LOG.error("Unhandled caching exception.", e); + } + } + + public void stop() { + LOG.info("Shutting down Auto transaction logs caching service."); + blockAddedSubscriptionId.ifPresent(blockchain::removeObserver); + chainReorgSubscriptionId.ifPresent(blockchain::removeChainReorgObserver); + } +} diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java index de70adb5b0b..4b49d760e8e 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java @@ -15,7 +15,7 @@ package org.hyperledger.besu.ethereum.api.query; import static com.google.common.base.Preconditions.checkArgument; -import static org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer.BLOCKS_PER_BLOOM_CACHE; +import static org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.BLOCKS_PER_BLOOM_CACHE; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.TransactionLocation; @@ -61,7 +61,7 @@ public class BlockchainQueries { private final WorldStateArchive worldStateArchive; private final Blockchain blockchain; private final Optional cachePath; - private final Optional transactionLogsIndexer; + private final Optional transactionLogBloomCacher; public BlockchainQueries(final Blockchain blockchain, final WorldStateArchive worldStateArchive) { this(blockchain, worldStateArchive, Optional.empty(), Optional.empty()); @@ -82,9 +82,10 @@ public BlockchainQueries( this.blockchain = blockchain; this.worldStateArchive = worldStateArchive; this.cachePath = cachePath; - this.transactionLogsIndexer = + this.transactionLogBloomCacher = (cachePath.isPresent() && scheduler.isPresent()) - ? Optional.of(new TransactionLogsIndexer(blockchain, cachePath.get(), scheduler.get())) + ? Optional.of( + new TransactionLogBloomCacher(blockchain, cachePath.get(), scheduler.get())) : Optional.empty(); } @@ -96,8 +97,8 @@ public WorldStateArchive getWorldStateArchive() { return worldStateArchive; } - public Optional getTransactionLogsIndexer() { - return transactionLogsIndexer; + public Optional getTransactionLogBloomCacher() { + return transactionLogBloomCacher; } /** diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogBloomCacher.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogBloomCacher.java new file mode 100644 index 00000000000..c8e6cc6286e --- /dev/null +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogBloomCacher.java @@ -0,0 +1,292 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.ethereum.api.query; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class TransactionLogBloomCacher { + + private static final Logger LOG = LogManager.getLogger(); + + public static final int BLOCKS_PER_BLOOM_CACHE = 100_000; + private static final int BLOOM_BITS_LENGTH = 256; + public static final String CURRENT = "current"; + private final Map cachedSegments; + + private final Lock submissionLock = new ReentrantLock(); + + private final EthScheduler scheduler; + private final Blockchain blockchain; + + private final Path cacheDir; + + private final CachingStatus cachingStatus = new CachingStatus(); + + public TransactionLogBloomCacher( + final Blockchain blockchain, final Path cacheDir, final EthScheduler scheduler) { + this.blockchain = blockchain; + this.cacheDir = cacheDir; + this.scheduler = scheduler; + this.cachedSegments = new TreeMap<>(); + } + + void cacheAll() { + ensurePreviousSegmentsArePresent(blockchain.getChainHeadBlockNumber()); + } + + private static File calculateCacheFileName(final String name, final Path cacheDir) { + return cacheDir.resolve("logBloom-" + name + ".cache").toFile(); + } + + private static File calculateCacheFileName(final long blockNumber, final Path cacheDir) { + return calculateCacheFileName(Long.toString(blockNumber / BLOCKS_PER_BLOOM_CACHE), cacheDir); + } + + public CachingStatus generateLogBloomCache(final long start, final long stop) { + checkArgument( + start % BLOCKS_PER_BLOOM_CACHE == 0, "Start block must be at the beginning of a file"); + try { + cachingStatus.cachingCount.incrementAndGet(); + LOG.info( + "Generating transaction log bloom cache from block {} to block {} in {}", + start, + stop, + cacheDir); + if (!Files.isDirectory(cacheDir) && !cacheDir.toFile().mkdirs()) { + LOG.error("Cache directory '{}' does not exist and could not be made.", cacheDir); + return cachingStatus; + } + for (long blockNum = start; blockNum < stop; blockNum += BLOCKS_PER_BLOOM_CACHE) { + LOG.info("Caching segment at {}", blockNum); + final File cacheFile = calculateCacheFileName(blockNum, cacheDir); + blockchain + .getBlockHeader(blockNum) + .ifPresent( + blockHeader -> + cacheLogsBloomForBlockHeader(blockHeader, Optional.of(cacheFile), false)); + try (final OutputStream os = new FileOutputStream(cacheFile)) { + fillCacheFile(blockNum, blockNum + BLOCKS_PER_BLOOM_CACHE, os); + } + } + } catch (final Exception e) { + LOG.error("Unhandled caching exception", e); + } finally { + cachingStatus.cachingCount.decrementAndGet(); + LOG.info("Caching request complete"); + } + return cachingStatus; + } + + private void fillCacheFile(final long startBlock, final long stopBlock, final OutputStream fos) + throws IOException { + long blockNum = startBlock; + while (blockNum < stopBlock) { + final Optional maybeHeader = blockchain.getBlockHeader(blockNum); + if (maybeHeader.isEmpty()) { + break; + } + fillCacheFileWithBlock(maybeHeader.get(), fos); + cachingStatus.currentBlock = blockNum; + blockNum++; + } + } + + void cacheLogsBloomForBlockHeader( + final BlockHeader blockHeader, + final Optional reusedCacheFile, + final boolean ensureChecks) { + try { + if (cachingStatus.cachingCount.incrementAndGet() != 1) { + return; + } + final long blockNumber = blockHeader.getNumber(); + LOG.debug("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); + if (ensureChecks) { + ensurePreviousSegmentsArePresent(blockNumber); + } + final File cacheFile = reusedCacheFile.orElse(calculateCacheFileName(blockNumber, cacheDir)); + if (cacheFile.exists()) { + cacheSingleBlock(blockHeader, cacheFile); + } else { + scheduler.scheduleComputationTask(this::populateLatestSegment); + } + } catch (final IOException e) { + LOG.error("Unhandled caching exception.", e); + } finally { + cachingStatus.cachingCount.decrementAndGet(); + } + } + + private void cacheSingleBlock(final BlockHeader blockHeader, final File cacheFile) + throws IOException { + try (final RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) { + final long offset = (blockHeader.getNumber() % BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH; + writer.seek(offset); + writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } + } + + private boolean populateLatestSegment() { + try { + long blockNumber = blockchain.getChainHeadBlockNumber(); + final File currentFile = calculateCacheFileName(CURRENT, cacheDir); + final long segmentNumber = blockNumber / BLOCKS_PER_BLOOM_CACHE; + try (final OutputStream out = new FileOutputStream(currentFile)) { + fillCacheFile(segmentNumber * BLOCKS_PER_BLOOM_CACHE, blockNumber, out); + } + while (blockNumber <= blockchain.getChainHeadBlockNumber() + && (blockNumber % BLOCKS_PER_BLOOM_CACHE != 0)) { + cacheSingleBlock(blockchain.getBlockHeader(blockNumber).orElseThrow(), currentFile); + blockNumber++; + } + Files.move( + currentFile.toPath(), + calculateCacheFileName(blockNumber, cacheDir).toPath(), + StandardCopyOption.REPLACE_EXISTING, + StandardCopyOption.ATOMIC_MOVE); + return true; + } catch (final IOException e) { + LOG.error("Unhandled caching exception.", e); + return false; + } + } + + private void ensurePreviousSegmentsArePresent(final long blockNumber) { + if (!cachingStatus.isCaching()) { + scheduler.scheduleFutureTask( + () -> { + long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1; + while (currentSegment > 0) { + try { + if (!cachedSegments.getOrDefault(currentSegment, false)) { + final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE; + generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE); + cachedSegments.put(currentSegment, true); + } + } finally { + currentSegment--; + } + } + }, + Duration.ofSeconds(1)); + } + } + + private void fillCacheFileWithBlock(final BlockHeader blockHeader, final OutputStream fos) + throws IOException { + fos.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } + + private byte[] ensureBloomBitsAreCorrectLength(final byte[] logs) { + checkNotNull(logs); + checkState(logs.length == BLOOM_BITS_LENGTH, "BloomBits are not the correct length"); + return logs; + } + + public CachingStatus requestCaching(final long fromBlock, final long toBlock) { + boolean requestAccepted = false; + try { + if ((fromBlock < toBlock) && submissionLock.tryLock(100, TimeUnit.MILLISECONDS)) { + try { + if (!cachingStatus.isCaching()) { + requestAccepted = true; + cachingStatus.startBlock = fromBlock; + cachingStatus.endBlock = toBlock; + scheduler.scheduleComputationTask( + () -> + generateLogBloomCache( + fromBlock - (fromBlock % BLOCKS_PER_BLOOM_CACHE), toBlock)); + } + } finally { + submissionLock.unlock(); + } + } + } catch (final InterruptedException e) { + // ignore + } + cachingStatus.requestAccepted = requestAccepted; + return cachingStatus; + } + + EthScheduler getScheduler() { + return scheduler; + } + + Path getCacheDir() { + return cacheDir; + } + + public static final class CachingStatus { + long startBlock; + long endBlock; + volatile long currentBlock; + AtomicInteger cachingCount = new AtomicInteger(0); + boolean requestAccepted; + + @JsonGetter + public String getStartBlock() { + return "0x" + Long.toHexString(startBlock); + } + + @JsonGetter + public String getEndBlock() { + return endBlock == Long.MAX_VALUE ? "latest" : "0x" + Long.toHexString(endBlock); + } + + @JsonGetter + public String getCurrentBlock() { + return "0x" + Long.toHexString(currentBlock); + } + + @JsonGetter + public boolean isCaching() { + return cachingCount.get() > 0; + } + + @JsonGetter + public boolean isRequestAccepted() { + return requestAccepted; + } + } +} diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java deleted file mode 100644 index 11223e04c0d..00000000000 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.hyperledger.besu.ethereum.api.query; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import org.hyperledger.besu.ethereum.chain.Blockchain; -import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class TransactionLogsIndexer { - - private static final Logger LOG = LogManager.getLogger(); - - public static final int BLOCKS_PER_BLOOM_CACHE = 100_000; - public static final String PENDING = "pending"; - - private final Lock submissionLock = new ReentrantLock(); - private final EthScheduler scheduler; - private final Blockchain blockchain; - private final Path cacheDir; - - private final IndexingStatus indexingStatus = new IndexingStatus(); - - public TransactionLogsIndexer( - final Blockchain blockchain, final Path cacheDir, final EthScheduler scheduler) { - this.blockchain = blockchain; - this.cacheDir = cacheDir; - this.scheduler = scheduler; - } - - private static File calculateCacheFileName(final String name, final Path cacheDir) { - return cacheDir.resolve("logBloom-" + name + ".cache").toFile(); - } - - private static File calculateCacheFileName(final long blockNumber, final Path cacheDir) { - return calculateCacheFileName(Long.toString(blockNumber / BLOCKS_PER_BLOOM_CACHE), cacheDir); - } - - public IndexingStatus generateLogBloomCache(final long start, final long stop) { - checkArgument( - start % BLOCKS_PER_BLOOM_CACHE == 0, "Start block must be at the beginning of a file"); - try { - indexingStatus.indexing = true; - LOG.info( - "Generating transaction log indexes from block {} to block {} in {}", - start, - stop, - cacheDir); - if (!Files.isDirectory(cacheDir) && !cacheDir.toFile().mkdirs()) { - LOG.error("Cache directory '{}' does not exist and could not be made.", cacheDir); - return indexingStatus; - } - - final File pendingFile = calculateCacheFileName(PENDING, cacheDir); - for (long blockNum = start; blockNum < stop; blockNum += BLOCKS_PER_BLOOM_CACHE) { - LOG.info("Indexing segment at {}", blockNum); - try (final FileOutputStream fos = new FileOutputStream(pendingFile)) { - final long blockCount = fillCacheFile(blockNum, blockNum + BLOCKS_PER_BLOOM_CACHE, fos); - if (blockCount == BLOCKS_PER_BLOOM_CACHE) { - Files.move( - pendingFile.toPath(), - calculateCacheFileName(blockNum, cacheDir).toPath(), - StandardCopyOption.REPLACE_EXISTING, - StandardCopyOption.ATOMIC_MOVE); - } else { - LOG.info("Partial segment at {}, only {} blocks cached", blockNum, blockCount); - break; - } - } - } - } catch (final Exception e) { - LOG.error("Unhandled indexing exception", e); - } finally { - indexingStatus.indexing = false; - LOG.info("Indexing request complete"); - } - return indexingStatus; - } - - private long fillCacheFile( - final long startBlock, final long stopBlock, final FileOutputStream fos) throws IOException { - long blockNum = startBlock; - while (blockNum < stopBlock) { - final Optional maybeHeader = blockchain.getBlockHeader(blockNum); - if (maybeHeader.isEmpty()) { - break; - } - final byte[] logs = maybeHeader.get().getLogsBloom().toArray(); - checkNotNull(logs); - checkState(logs.length == 256, "BloomBits are not the correct length"); - fos.write(logs); - indexingStatus.currentBlock = blockNum; - blockNum++; - } - return blockNum - startBlock; - } - - public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) { - boolean requestAccepted = false; - try { - if ((fromBlock < toBlock) && submissionLock.tryLock(100, TimeUnit.MILLISECONDS)) { - try { - if (!indexingStatus.indexing) { - requestAccepted = true; - indexingStatus.startBlock = fromBlock; - indexingStatus.endBlock = toBlock; - scheduler.scheduleComputationTask( - () -> - generateLogBloomCache( - fromBlock - (fromBlock % BLOCKS_PER_BLOOM_CACHE), toBlock)); - } - } finally { - submissionLock.unlock(); - } - } - } catch (final InterruptedException e) { - // ignore - } - indexingStatus.requestAccepted = requestAccepted; - return indexingStatus; - } - - public static final class IndexingStatus { - long startBlock; - long endBlock; - volatile long currentBlock; - volatile boolean indexing; - boolean requestAccepted; - - @JsonGetter - public String getStartBlock() { - return "0x" + Long.toHexString(startBlock); - } - - @JsonGetter - public String getEndBlock() { - return endBlock == Long.MAX_VALUE ? "latest" : "0x" + Long.toHexString(endBlock); - } - - @JsonGetter - public String getCurrentBlock() { - return "0x" + Long.toHexString(currentBlock); - } - - @JsonGetter - public boolean isIndexing() { - return indexing; - } - - @JsonGetter - public boolean isRequestAccepted() { - return requestAccepted; - } - } -} diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCacheTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCacheTest.java index 34b7c984a69..f9e734fd93a 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCacheTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/AdminGenerateLogBloomCacheTest.java @@ -23,8 +23,8 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse; import org.hyperledger.besu.ethereum.api.query.BlockchainQueries; -import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer; -import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer.IndexingStatus; +import org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher; +import org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.CachingStatus; import java.util.List; import java.util.Optional; @@ -41,7 +41,7 @@ public class AdminGenerateLogBloomCacheTest { @Mock private BlockchainQueries blockchainQueries; - @Mock private TransactionLogsIndexer transactionLogsIndexer; + @Mock private TransactionLogBloomCacher transactionLogBloomCacher; @Captor private ArgumentCaptor fromBlock; @Captor private ArgumentCaptor toBlock; @@ -53,12 +53,12 @@ public void setup() { } @Test - public void requestWithZeroParameters_NoIndexer_returnsNull() { + public void requestWithZeroParameters_NoCacher_returnsNull() { final JsonRpcRequestContext request = new JsonRpcRequestContext( new JsonRpcRequest("2.0", "admin_generateLogBloomCache", new String[] {})); - when(blockchainQueries.getTransactionLogsIndexer()).thenReturn(Optional.empty()); + when(blockchainQueries.getTransactionLogBloomCacher()).thenReturn(Optional.empty()); final JsonRpcResponse actualResponse = method.response(request); @@ -109,11 +109,11 @@ public void testMockedResult( final JsonRpcRequestContext request = new JsonRpcRequestContext(new JsonRpcRequest("2.0", "admin_generateLogBloomCache", args)); - final IndexingStatus expectedStatus = new IndexingStatus(); + final CachingStatus expectedStatus = new CachingStatus(); - when(blockchainQueries.getTransactionLogsIndexer()) - .thenReturn(Optional.of(transactionLogsIndexer)); - when(transactionLogsIndexer.requestIndexing(fromBlock.capture(), toBlock.capture())) + when(blockchainQueries.getTransactionLogBloomCacher()) + .thenReturn(Optional.of(transactionLogBloomCacher)); + when(transactionLogBloomCacher.requestCaching(fromBlock.capture(), toBlock.capture())) .thenReturn(expectedStatus); final JsonRpcResponse actualResponse = method.response(request); diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesLogCacheTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesLogCacheTest.java index f1e26ca6b49..f81d7d94ffe 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesLogCacheTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueriesLogCacheTest.java @@ -16,7 +16,7 @@ package org.hyperledger.besu.ethereum.api.query; -import static org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer.BLOCKS_PER_BLOOM_CACHE; +import static org.hyperledger.besu.ethereum.api.query.TransactionLogBloomCacher.BLOCKS_PER_BLOOM_CACHE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.times; diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java index 5e9e59816fa..f0b7a157203 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java @@ -206,4 +206,22 @@ default long observeLogs(final Consumer logObserver) { * @return {@code true} if the observer was removed; otherwise {@code false} */ boolean removeObserver(long observerId); + + /** + * Adds an observer that will get called when a new block is added after reorg. + * + *

No guarantees are made about the order in which observers are invoked. + * + * @param observer the observer to call + * @return the observer ID that can be used to remove it later. + */ + long observeChainReorg(ChainReorgObserver observer); + + /** + * Removes a previously added {@link ChainReorgObserver}. + * + * @param observerId the ID of the observer to remove + * @return {@code true} if the observer was removed; otherwise {@code false} + */ + boolean removeChainReorgObserver(long observerId); } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java new file mode 100644 index 00000000000..aeed1411295 --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java @@ -0,0 +1,22 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.chain; + +import org.hyperledger.besu.ethereum.core.BlockHeader; + +public interface ChainReorgObserver { + + void onBlockAdded(BlockHeader blockHeader, Blockchain blockchain); +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java index e64c03f2200..0602882c532 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java @@ -53,6 +53,7 @@ public class DefaultBlockchain implements MutableBlockchain { protected final BlockchainStorage blockchainStorage; private final Subscribers blockAddedObservers = Subscribers.create(); + private final Subscribers blockReorgObservers = Subscribers.create(); private volatile BlockHeader chainHeader; private volatile Difficulty totalDifficulty; @@ -344,7 +345,7 @@ private BlockAddedEvent handleChainReorg( newTransactions.put( blockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions()); addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts); - + notifyChainReorgBlockAdded(currentNewChainWithReceipts.getHeader()); currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts); } @@ -547,6 +548,17 @@ public boolean removeObserver(final long observerId) { return blockAddedObservers.unsubscribe(observerId); } + @Override + public long observeChainReorg(final ChainReorgObserver observer) { + checkNotNull(observer); + return blockReorgObservers.subscribe(observer); + } + + @Override + public boolean removeChainReorgObserver(final long observerId) { + return blockReorgObservers.unsubscribe(observerId); + } + @VisibleForTesting int observerCount() { return blockAddedObservers.getSubscriberCount(); @@ -555,4 +567,8 @@ int observerCount() { private void notifyBlockAdded(final BlockAddedEvent event) { blockAddedObservers.forEach(observer -> observer.onBlockAdded(event, this)); } + + private void notifyChainReorgBlockAdded(final BlockHeader blockHeader) { + blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockHeader, this)); + } } diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java index 4e9991549c5..9f1790fea81 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.ChainHead; +import org.hyperledger.besu.ethereum.chain.ChainReorgObserver; import org.hyperledger.besu.ethereum.chain.TransactionLocation; import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -149,6 +150,16 @@ public boolean removeObserver(final long observerId) { throw new NonDeterministicOperationException("Listening for new blocks is not deterministic"); } + @Override + public long observeChainReorg(final ChainReorgObserver observer) { + throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); + } + + @Override + public boolean removeChainReorgObserver(final long observerId) { + throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); + } + public static class NonDeterministicOperationException extends RuntimeException { public NonDeterministicOperationException(final String message) { super(message);