diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java index ae27c30e130..829fad59900 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java @@ -257,6 +257,9 @@ public void startNode(final BesuNode node) { params.add("--key-value-storage"); params.add("rocksdb"); + params.add("--auto-logs-bloom-indexing-enabled"); + params.add("false"); + LOG.info("Creating besu process with params {}", params); final ProcessBuilder processBuilder = new ProcessBuilder(params) diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java index 68e0d3cbeab..a3fce047b3e 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java @@ -195,6 +195,7 @@ public void startNode(final BesuNode node) { .map(EnodeURL::fromString) .collect(Collectors.toList())) .besuPluginContext(new BesuPluginContextImpl()) + .autoLogsBloomIndexing(false) .build(); runner.start(); diff --git a/besu/src/main/java/org/hyperledger/besu/Runner.java b/besu/src/main/java/org/hyperledger/besu/Runner.java index 08d8880bf7e..4b67f5ec61c 100644 --- a/besu/src/main/java/org/hyperledger/besu/Runner.java +++ b/besu/src/main/java/org/hyperledger/besu/Runner.java @@ -18,6 +18,9 @@ import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService; import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService; +import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogsIndexingService; +import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.ethereum.stratum.StratumServer; @@ -58,6 +61,7 @@ public class Runner implements AutoCloseable { private final BesuController besuController; private final Path dataDir; private final Optional stratumServer; + private final Optional autoTransactionLogsIndexingService; Runner( final Vertx vertx, @@ -69,7 +73,9 @@ public class Runner implements AutoCloseable { final Optional stratumServer, final Optional metrics, final BesuController besuController, - final Path dataDir) { + final Path dataDir, + final Optional transactionLogsIndexer, + final Blockchain blockchain) { this.vertx = vertx; this.networkRunner = networkRunner; this.natService = natService; @@ -80,6 +86,9 @@ public class Runner implements AutoCloseable { this.besuController = besuController; this.dataDir = dataDir; this.stratumServer = stratumServer; + this.autoTransactionLogsIndexingService = + transactionLogsIndexer.map( + indexer -> new AutoTransactionLogsIndexingService(blockchain, indexer)); } public void start() { @@ -103,6 +112,7 @@ public void start() { LOG.info("Ethereum main loop is up."); writeBesuPortsToFile(); writeBesuNetworksToFile(); + autoTransactionLogsIndexingService.ifPresent(AutoTransactionLogsIndexingService::start); } catch (final Exception ex) { LOG.error("Startup failed", ex); throw new IllegalStateException(ex); @@ -125,7 +135,7 @@ public void stop() { networkRunner.stop(); waitForServiceToStop("Network", networkRunner::awaitStop); - + autoTransactionLogsIndexingService.ifPresent(AutoTransactionLogsIndexingService::stop); natService.stop(); besuController.close(); vertx.close((res) -> vertxShutdownLatch.countDown()); diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index ff65476fde9..dc0e1754976 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -144,6 +144,7 @@ public class RunnerBuilder { private Collection staticNodes = Collections.emptyList(); private Optional identityString = Optional.empty(); private BesuPluginContextImpl besuPluginContext; + private boolean autoLogsBloomIndexing = true; public RunnerBuilder vertx(final Vertx vertx) { this.vertx = vertx; @@ -270,6 +271,11 @@ public RunnerBuilder besuPluginContext(final BesuPluginContextImpl besuPluginCon return this; } + public RunnerBuilder autoLogsBloomIndexing(final boolean autoLogsBloomIndexing) { + this.autoLogsBloomIndexing = autoLogsBloomIndexing; + return this; + } + public Runner build() { Preconditions.checkNotNull(besuController); @@ -527,7 +533,9 @@ public Runner build() { stratumServer, metricsService, besuController, - dataDir); + dataDir, + autoLogsBloomIndexing ? blockchainQueries.getTransactionLogsIndexer() : Optional.empty(), + context.getBlockchain()); } private Optional buildNodePermissioningController( diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index fc57772a0db..1a11521e98b 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -790,6 +790,12 @@ void setBannedNodeIds(final List values) { arity = "1") private String keyValueStorageName = DEFAULT_KEY_VALUE_STORAGE_NAME; + @Option( + names = {"--auto-logs-bloom-indexing-enabled"}, + description = "Enable Automatic logs bloom indexing (default: ${DEFAULT-VALUE})", + arity = "1") + private final Boolean autoLogsBloomIndexingEnabled = true; + @Option( names = {"--override-genesis-config"}, paramLabel = "NAME=VALUE", @@ -1695,6 +1701,7 @@ private void synchronize( .staticNodes(staticNodes) .identityString(identityString) .besuPluginContext(besuPluginContext) + .autoLogsBloomIndexing(autoLogsBloomIndexingEnabled) .build(); addShutdownHook(runner); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java index 5883dbc42e3..373cbd2a995 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java @@ -175,6 +175,7 @@ public void callingBesuCommandWithoutOptionsMustSyncWithDefaultValues() throws E verify(mockRunnerBuilder).webSocketConfiguration(eq(DEFAULT_WEB_SOCKET_CONFIGURATION)); verify(mockRunnerBuilder).metricsConfiguration(eq(DEFAULT_METRICS_CONFIGURATION)); verify(mockRunnerBuilder).ethNetworkConfig(ethNetworkArg.capture()); + verify(mockRunnerBuilder).autoLogsBloomIndexing(eq(true)); verify(mockRunnerBuilder).build(); verify(mockControllerBuilderFactory).fromEthNetworkConfig(ethNetworkArg.capture(), any()); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java index 9b4d6db7cb5..4d4f31ed8ee 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java @@ -220,6 +220,7 @@ public void initMocks() throws Exception { when(mockRunnerBuilder.staticNodes(any())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.identityString(any())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.besuPluginContext(any())).thenReturn(mockRunnerBuilder); + when(mockRunnerBuilder.autoLogsBloomIndexing(anyBoolean())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.build()).thenReturn(mockRunner); when(storageService.getByName("rocksdb")).thenReturn(Optional.of(rocksDBStorageFactory)); diff --git a/besu/src/test/resources/everything_config.toml b/besu/src/test/resources/everything_config.toml index 56e60dc23b0..c0b19c30033 100644 --- a/besu/src/test/resources/everything_config.toml +++ b/besu/src/test/resources/everything_config.toml @@ -134,4 +134,6 @@ revert-reason-enabled=false key-value-storage="rocksdb" # Gas limit -target-gas-limit=8000000 \ No newline at end of file +target-gas-limit=8000000 + +auto-logs-bloom-indexing-enabled=true \ No newline at end of file diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java new file mode 100644 index 00000000000..b1d71cf0890 --- /dev/null +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -0,0 +1,75 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.api.query; + +import org.hyperledger.besu.ethereum.chain.Blockchain; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.OptionalLong; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class AutoTransactionLogsIndexingService { + protected static final Logger LOG = LogManager.getLogger(); + private final Blockchain blockchain; + private final TransactionLogsIndexer transactionLogsIndexer; + private OptionalLong blockAddedSubscriptionId = OptionalLong.empty(); + private OptionalLong chainReorgSubscriptionId = OptionalLong.empty(); + + public AutoTransactionLogsIndexingService( + final Blockchain blockchain, final TransactionLogsIndexer transactionLogsIndexer) { + this.blockchain = blockchain; + this.transactionLogsIndexer = transactionLogsIndexer; + } + + public void start() { + try { + LOG.info("Starting Auto transaction logs indexing service."); + final Path cacheDir = transactionLogsIndexer.getCacheDir(); + if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) { + Files.createDirectory(cacheDir); + } + blockAddedSubscriptionId = + OptionalLong.of( + blockchain.observeBlockAdded( + (event, __) -> { + if (event.isNewCanonicalHead()) { + transactionLogsIndexer.cacheLogsBloomForBlockHeader( + event.getBlock().getHeader()); + } + })); + chainReorgSubscriptionId = + OptionalLong.of( + blockchain.observeChainReorg( + (header, __) -> transactionLogsIndexer.cacheLogsBloomForBlockHeader(header))); + + transactionLogsIndexer + .getScheduler() + .scheduleFutureTask(transactionLogsIndexer::indexAll, Duration.ofMinutes(1)); + } catch (IOException e) { + LOG.error("Unhandled indexing exception.", e); + } + } + + public void stop() { + LOG.info("Shutting down Auto transaction logs indexing service."); + blockAddedSubscriptionId.ifPresent(blockchain::removeObserver); + chainReorgSubscriptionId.ifPresent(blockchain::removeChainReorgObserver); + } +} diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index 11223e04c0d..ad7ed9d9da2 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -27,10 +27,14 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.time.Duration; +import java.util.Map; import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -44,11 +48,15 @@ public class TransactionLogsIndexer { private static final Logger LOG = LogManager.getLogger(); public static final int BLOCKS_PER_BLOOM_CACHE = 100_000; + private static final int BLOOM_BITS_LENGTH = 256; public static final String PENDING = "pending"; + private final Map cachedSegments; private final Lock submissionLock = new ReentrantLock(); + private final EthScheduler scheduler; private final Blockchain blockchain; + private final Path cacheDir; private final IndexingStatus indexingStatus = new IndexingStatus(); @@ -58,6 +66,11 @@ public TransactionLogsIndexer( this.blockchain = blockchain; this.cacheDir = cacheDir; this.scheduler = scheduler; + this.cachedSegments = new TreeMap<>(); + } + + public void indexAll() { + ensurePreviousSegmentsArePresent(blockchain.getChainHeadBlockNumber()); } private static File calculateCacheFileName(final String name, final Path cacheDir) { @@ -117,16 +130,64 @@ private long fillCacheFile( if (maybeHeader.isEmpty()) { break; } - final byte[] logs = maybeHeader.get().getLogsBloom().toArray(); - checkNotNull(logs); - checkState(logs.length == 256, "BloomBits are not the correct length"); - fos.write(logs); + fillCacheFileWithBlock(maybeHeader.get(), fos); indexingStatus.currentBlock = blockNum; blockNum++; } return blockNum - startBlock; } + public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { + try { + final long blockNumber = blockHeader.getNumber(); + LOG.debug("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); + ensurePreviousSegmentsArePresent(blockNumber); + final File cacheFile = calculateCacheFileName(blockNumber, cacheDir); + if (!cacheFile.exists()) { + Files.createFile(cacheFile.toPath()); + } + try (RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) { + final long offset = (blockNumber / BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH; + writer.seek(offset); + writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } + } catch (IOException e) { + LOG.error("Unhandled indexing exception.", e); + } + } + + private void ensurePreviousSegmentsArePresent(final long blockNumber) { + if (!indexingStatus.isIndexing()) { + scheduler.scheduleFutureTask( + () -> { + long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1; + while (currentSegment > 0) { + try { + if (!cachedSegments.getOrDefault(currentSegment, false)) { + final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE; + generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE); + cachedSegments.put(currentSegment, true); + } + } finally { + currentSegment--; + } + } + }, + Duration.ofSeconds(1)); + } + } + + private void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos) + throws IOException { + fos.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } + + private byte[] ensureBloomBitsAreCorrectLength(final byte[] logs) { + checkNotNull(logs); + checkState(logs.length == BLOOM_BITS_LENGTH, "BloomBits are not the correct length"); + return logs; + } + public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) { boolean requestAccepted = false; try { @@ -152,6 +213,14 @@ public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) return indexingStatus; } + public EthScheduler getScheduler() { + return scheduler; + } + + Path getCacheDir() { + return cacheDir; + } + public static final class IndexingStatus { long startBlock; long endBlock; diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java index 5e9e59816fa..f0b7a157203 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java @@ -206,4 +206,22 @@ default long observeLogs(final Consumer logObserver) { * @return {@code true} if the observer was removed; otherwise {@code false} */ boolean removeObserver(long observerId); + + /** + * Adds an observer that will get called when a new block is added after reorg. + * + *

No guarantees are made about the order in which observers are invoked. + * + * @param observer the observer to call + * @return the observer ID that can be used to remove it later. + */ + long observeChainReorg(ChainReorgObserver observer); + + /** + * Removes a previously added {@link ChainReorgObserver}. + * + * @param observerId the ID of the observer to remove + * @return {@code true} if the observer was removed; otherwise {@code false} + */ + boolean removeChainReorgObserver(long observerId); } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java new file mode 100644 index 00000000000..aeed1411295 --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java @@ -0,0 +1,22 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.chain; + +import org.hyperledger.besu.ethereum.core.BlockHeader; + +public interface ChainReorgObserver { + + void onBlockAdded(BlockHeader blockHeader, Blockchain blockchain); +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java index e64c03f2200..0602882c532 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java @@ -53,6 +53,7 @@ public class DefaultBlockchain implements MutableBlockchain { protected final BlockchainStorage blockchainStorage; private final Subscribers blockAddedObservers = Subscribers.create(); + private final Subscribers blockReorgObservers = Subscribers.create(); private volatile BlockHeader chainHeader; private volatile Difficulty totalDifficulty; @@ -344,7 +345,7 @@ private BlockAddedEvent handleChainReorg( newTransactions.put( blockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions()); addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts); - + notifyChainReorgBlockAdded(currentNewChainWithReceipts.getHeader()); currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts); } @@ -547,6 +548,17 @@ public boolean removeObserver(final long observerId) { return blockAddedObservers.unsubscribe(observerId); } + @Override + public long observeChainReorg(final ChainReorgObserver observer) { + checkNotNull(observer); + return blockReorgObservers.subscribe(observer); + } + + @Override + public boolean removeChainReorgObserver(final long observerId) { + return blockReorgObservers.unsubscribe(observerId); + } + @VisibleForTesting int observerCount() { return blockAddedObservers.getSubscriberCount(); @@ -555,4 +567,8 @@ int observerCount() { private void notifyBlockAdded(final BlockAddedEvent event) { blockAddedObservers.forEach(observer -> observer.onBlockAdded(event, this)); } + + private void notifyChainReorgBlockAdded(final BlockHeader blockHeader) { + blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockHeader, this)); + } } diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java index 4e9991549c5..9f1790fea81 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.ChainHead; +import org.hyperledger.besu.ethereum.chain.ChainReorgObserver; import org.hyperledger.besu.ethereum.chain.TransactionLocation; import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -149,6 +150,16 @@ public boolean removeObserver(final long observerId) { throw new NonDeterministicOperationException("Listening for new blocks is not deterministic"); } + @Override + public long observeChainReorg(final ChainReorgObserver observer) { + throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); + } + + @Override + public boolean removeChainReorgObserver(final long observerId) { + throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); + } + public static class NonDeterministicOperationException extends RuntimeException { public NonDeterministicOperationException(final String message) { super(message);