From 6963071ba7eea3a0c636752d641759f19a9f6b55 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 5 Feb 2020 10:50:19 +0100 Subject: [PATCH 01/18] First iteration. Draft PR. Signed-off-by: Abdelhamid Bakhta --- .../java/org/hyperledger/besu/Runner.java | 11 ++++- .../org/hyperledger/besu/RunnerBuilder.java | 3 +- .../AutoTransactionLogsIndexingService.java | 41 +++++++++++++++++++ .../api/query/TransactionLogsIndexer.java | 16 ++++++++ 4 files changed, 68 insertions(+), 3 deletions(-) create mode 100644 ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java diff --git a/besu/src/main/java/org/hyperledger/besu/Runner.java b/besu/src/main/java/org/hyperledger/besu/Runner.java index 08d8880bf7e..bdc701cf8dd 100644 --- a/besu/src/main/java/org/hyperledger/besu/Runner.java +++ b/besu/src/main/java/org/hyperledger/besu/Runner.java @@ -18,6 +18,8 @@ import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService; import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService; +import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogsIndexingService; +import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer; import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.ethereum.stratum.StratumServer; @@ -58,6 +60,7 @@ public class Runner implements AutoCloseable { private final BesuController besuController; private final Path dataDir; private final Optional stratumServer; + private final Optional autoTransactionLogsIndexingService; Runner( final Vertx vertx, @@ -69,7 +72,8 @@ public class Runner implements AutoCloseable { final Optional stratumServer, final Optional metrics, final BesuController besuController, - final Path dataDir) { + final Path dataDir, + final Optional transactionLogsIndexer) { this.vertx = vertx; this.networkRunner = networkRunner; this.natService = natService; @@ -80,6 +84,8 @@ public class Runner implements AutoCloseable { this.besuController = besuController; this.dataDir = dataDir; this.stratumServer = stratumServer; + this.autoTransactionLogsIndexingService = + transactionLogsIndexer.map(AutoTransactionLogsIndexingService::new); } public void start() { @@ -103,6 +109,7 @@ public void start() { LOG.info("Ethereum main loop is up."); writeBesuPortsToFile(); writeBesuNetworksToFile(); + autoTransactionLogsIndexingService.ifPresent(AutoTransactionLogsIndexingService::start); } catch (final Exception ex) { LOG.error("Startup failed", ex); throw new IllegalStateException(ex); @@ -125,7 +132,7 @@ public void stop() { networkRunner.stop(); waitForServiceToStop("Network", networkRunner::awaitStop); - + autoTransactionLogsIndexingService.ifPresent(AutoTransactionLogsIndexingService::stop); natService.stop(); besuController.close(); vertx.close((res) -> vertxShutdownLatch.countDown()); diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index a6c832bbe1c..25d9a199383 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -526,7 +526,8 @@ public Runner build() { stratumServer, metricsService, besuController, - dataDir); + dataDir, + blockchainQueries.getTransactionLogsIndexer()); } private Optional buildNodePermissioningController( diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java new file mode 100644 index 00000000000..cad2b548de3 --- /dev/null +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -0,0 +1,41 @@ +package org.hyperledger.besu.ethereum.api.query; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class AutoTransactionLogsIndexingService { + protected static final Logger LOG = LogManager.getLogger(); + + private final TransactionLogsIndexer transactionLogsIndexer; + private final ScheduledExecutorService executorService = + Executors.newSingleThreadScheduledExecutor(); + private TransactionLogsIndexer.IndexingStatus lastIndexingStatus; + + public AutoTransactionLogsIndexingService(final TransactionLogsIndexer transactionLogsIndexer) { + this.transactionLogsIndexer = transactionLogsIndexer; + } + + public void start() { + LOG.info("Starting Auto transaction logs indexing service."); + executorService.scheduleAtFixedRate(this::doIndex, 0, 10, TimeUnit.SECONDS); + } + + public void doIndex() { + LOG.info("Starting auto scheduled indexing."); + long startBlock = 0, stopBlock = Long.MAX_VALUE; + if (lastIndexingStatus != null) { + startBlock = lastIndexingStatus.currentBlock; + } + LOG.info("Calling log bloom cache with start = {} and stop = {}", startBlock, stopBlock); + lastIndexingStatus = transactionLogsIndexer.generateLogBloomCache(startBlock, stopBlock); + LOG.info("generateLogBloomCache completed with status: {}", lastIndexingStatus.toString()); + } + + public void stop() { + LOG.info("Shutting down Auto transaction logs indexing service."); + } +} diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index 11223e04c0d..db7ccacd446 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -183,5 +183,21 @@ public boolean isIndexing() { public boolean isRequestAccepted() { return requestAccepted; } + + @Override + public String toString() { + return "IndexingStatus{" + + "startBlock=" + + startBlock + + ", endBlock=" + + endBlock + + ", currentBlock=" + + currentBlock + + ", indexing=" + + indexing + + ", requestAccepted=" + + requestAccepted + + '}'; + } } } From ca62e34ad9f2421a1ba7f454e987cab804a136c1 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 5 Feb 2020 14:24:02 +0100 Subject: [PATCH 02/18] fix SPDX header Signed-off-by: Abdelhamid Bakhta --- .../query/AutoTransactionLogsIndexingService.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java index cad2b548de3..c53aea02302 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -1,3 +1,17 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ package org.hyperledger.besu.ethereum.api.query; import java.util.concurrent.Executors; From 388d0dcffa49a43ed47faedefd3b47410cd40c80 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 5 Feb 2020 18:32:55 +0100 Subject: [PATCH 03/18] Use block broadcaster to index log bloom. Signed-off-by: Abdelhamid Bakhta --- .../java/org/hyperledger/besu/Runner.java | 5 ++- .../org/hyperledger/besu/RunnerBuilder.java | 1 + .../AutoTransactionLogsIndexingService.java | 36 ++++++++++--------- .../api/query/TransactionLogsIndexer.java | 20 ++++++++--- 4 files changed, 40 insertions(+), 22 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/Runner.java b/besu/src/main/java/org/hyperledger/besu/Runner.java index bdc701cf8dd..8856c079648 100644 --- a/besu/src/main/java/org/hyperledger/besu/Runner.java +++ b/besu/src/main/java/org/hyperledger/besu/Runner.java @@ -20,6 +20,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService; import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogsIndexingService; import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer; +import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.ethereum.stratum.StratumServer; @@ -73,6 +74,7 @@ public class Runner implements AutoCloseable { final Optional metrics, final BesuController besuController, final Path dataDir, + final BlockBroadcaster blockBroadcaster, final Optional transactionLogsIndexer) { this.vertx = vertx; this.networkRunner = networkRunner; @@ -85,7 +87,8 @@ public class Runner implements AutoCloseable { this.dataDir = dataDir; this.stratumServer = stratumServer; this.autoTransactionLogsIndexingService = - transactionLogsIndexer.map(AutoTransactionLogsIndexingService::new); + transactionLogsIndexer.map( + indexer -> new AutoTransactionLogsIndexingService(blockBroadcaster, indexer)); } public void start() { diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index 25d9a199383..806985ef197 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -527,6 +527,7 @@ public Runner build() { metricsService, besuController, dataDir, + besuController.getProtocolManager().getBlockBroadcaster(), blockchainQueries.getTransactionLogsIndexer()); } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java index c53aea02302..d9c356e521d 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -14,42 +14,44 @@ */ package org.hyperledger.besu.ethereum.api.query; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; + +import java.io.IOException; +import java.util.OptionalLong; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class AutoTransactionLogsIndexingService { protected static final Logger LOG = LogManager.getLogger(); - + private final BlockBroadcaster blockBroadcaster; private final TransactionLogsIndexer transactionLogsIndexer; - private final ScheduledExecutorService executorService = - Executors.newSingleThreadScheduledExecutor(); - private TransactionLogsIndexer.IndexingStatus lastIndexingStatus; + private OptionalLong subscriptionId = OptionalLong.empty(); - public AutoTransactionLogsIndexingService(final TransactionLogsIndexer transactionLogsIndexer) { + public AutoTransactionLogsIndexingService( + final BlockBroadcaster blockBroadcaster, + final TransactionLogsIndexer transactionLogsIndexer) { + this.blockBroadcaster = blockBroadcaster; this.transactionLogsIndexer = transactionLogsIndexer; } public void start() { LOG.info("Starting Auto transaction logs indexing service."); - executorService.scheduleAtFixedRate(this::doIndex, 0, 10, TimeUnit.SECONDS); + subscriptionId = OptionalLong.of(blockBroadcaster.subscribePropagateNewBlocks(this::accept)); } - public void doIndex() { - LOG.info("Starting auto scheduled indexing."); - long startBlock = 0, stopBlock = Long.MAX_VALUE; - if (lastIndexingStatus != null) { - startBlock = lastIndexingStatus.currentBlock; + private void accept(final Block block, final Difficulty totalDifficulty) { + try { + transactionLogsIndexer.fillPendingCacheWithBlock(block.getHeader()); + } catch (IOException e) { + LOG.error("Unhandled indexing exception.", e); } - LOG.info("Calling log bloom cache with start = {} and stop = {}", startBlock, stopBlock); - lastIndexingStatus = transactionLogsIndexer.generateLogBloomCache(startBlock, stopBlock); - LOG.info("generateLogBloomCache completed with status: {}", lastIndexingStatus.toString()); } public void stop() { LOG.info("Shutting down Auto transaction logs indexing service."); + subscriptionId.ifPresent(blockBroadcaster::unsubscribePropagateNewBlocks); } } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index db7ccacd446..9717b12e08d 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -117,16 +117,28 @@ private long fillCacheFile( if (maybeHeader.isEmpty()) { break; } - final byte[] logs = maybeHeader.get().getLogsBloom().toArray(); - checkNotNull(logs); - checkState(logs.length == 256, "BloomBits are not the correct length"); - fos.write(logs); + fillCacheFileWithBlock(maybeHeader.get(), fos); indexingStatus.currentBlock = blockNum; blockNum++; } return blockNum - startBlock; } + public void fillPendingCacheWithBlock(final BlockHeader blockHeader) throws IOException { + final File pendingFile = calculateCacheFileName(PENDING, cacheDir); + try (FileOutputStream fos = new FileOutputStream(pendingFile)) { + fillCacheFileWithBlock(blockHeader, fos); + } + } + + public void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos) + throws IOException { + final byte[] logs = blockHeader.getLogsBloom().toArray(); + checkNotNull(logs); + checkState(logs.length == 256, "BloomBits are not the correct length"); + fos.write(logs); + } + public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) { boolean requestAccepted = false; try { From 94fccd4f122d7ab9418757dd41ce04fc6099ad6b Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 5 Feb 2020 18:34:15 +0100 Subject: [PATCH 04/18] Remove useless toString method Signed-off-by: Abdelhamid Bakhta --- .../api/query/TransactionLogsIndexer.java | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index 9717b12e08d..e4117939b9a 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -16,10 +16,9 @@ package org.hyperledger.besu.ethereum.api.query; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -35,9 +34,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; public class TransactionLogsIndexer { @@ -195,21 +194,5 @@ public boolean isIndexing() { public boolean isRequestAccepted() { return requestAccepted; } - - @Override - public String toString() { - return "IndexingStatus{" - + "startBlock=" - + startBlock - + ", endBlock=" - + endBlock - + ", currentBlock=" - + currentBlock - + ", indexing=" - + indexing - + ", requestAccepted=" - + requestAccepted - + '}'; - } } } From 997c333d0714201fd753a5affefc2828624b5d61 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 5 Feb 2020 18:37:17 +0100 Subject: [PATCH 05/18] spotless apply Signed-off-by: Abdelhamid Bakhta --- .../ethereum/api/query/TransactionLogsIndexer.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index e4117939b9a..8f38751f430 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -16,9 +16,10 @@ package org.hyperledger.besu.ethereum.api.query; -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -34,9 +35,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class TransactionLogsIndexer { From a29dcee1abee07fdbb5558b089b6138e2979bd27 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Thu, 6 Feb 2020 09:54:14 +0100 Subject: [PATCH 06/18] cacheLogsBloomForBlockHeader Signed-off-by: Abdelhamid Bakhta --- .../AutoTransactionLogsIndexingService.java | 21 ++++---- .../api/query/TransactionLogsIndexer.java | 49 +++++++++++++------ 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java index d9c356e521d..568d5c71fed 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -14,11 +14,9 @@ */ package org.hyperledger.besu.ethereum.api.query; -import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.Difficulty; import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; -import java.io.IOException; +import java.time.Duration; import java.util.OptionalLong; import org.apache.logging.log4j.LogManager; @@ -39,15 +37,14 @@ public AutoTransactionLogsIndexingService( public void start() { LOG.info("Starting Auto transaction logs indexing service."); - subscriptionId = OptionalLong.of(blockBroadcaster.subscribePropagateNewBlocks(this::accept)); - } - - private void accept(final Block block, final Difficulty totalDifficulty) { - try { - transactionLogsIndexer.fillPendingCacheWithBlock(block.getHeader()); - } catch (IOException e) { - LOG.error("Unhandled indexing exception.", e); - } + subscriptionId = + OptionalLong.of( + blockBroadcaster.subscribePropagateNewBlocks( + (block, __) -> + transactionLogsIndexer.cacheLogsBloomForBlockHeader(block.getHeader()))); + transactionLogsIndexer + .getScheduler() + .scheduleFutureTask(transactionLogsIndexer::indexAll, Duration.ofMinutes(1)); } public void stop() { diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index 8f38751f430..5b7f0a434e4 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -16,10 +16,9 @@ package org.hyperledger.besu.ethereum.api.query; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -27,6 +26,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -35,18 +35,20 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; public class TransactionLogsIndexer { private static final Logger LOG = LogManager.getLogger(); public static final int BLOCKS_PER_BLOOM_CACHE = 100_000; + private static final int BLOOM_BITS_LENGTH = 256; public static final String PENDING = "pending"; private final Lock submissionLock = new ReentrantLock(); + private final EthScheduler scheduler; private final Blockchain blockchain; private final Path cacheDir; @@ -60,6 +62,10 @@ public TransactionLogsIndexer( this.scheduler = scheduler; } + public IndexingStatus indexAll() { + return generateLogBloomCache(0, Long.MAX_VALUE); + } + private static File calculateCacheFileName(final String name, final Path cacheDir) { return cacheDir.resolve("logBloom-" + name + ".cache").toFile(); } @@ -124,19 +130,28 @@ private long fillCacheFile( return blockNum - startBlock; } - public void fillPendingCacheWithBlock(final BlockHeader blockHeader) throws IOException { - final File pendingFile = calculateCacheFileName(PENDING, cacheDir); - try (FileOutputStream fos = new FileOutputStream(pendingFile)) { - fillCacheFileWithBlock(blockHeader, fos); + public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { + final long blockNumber = blockHeader.getNumber(); + LOG.info("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); + final File cacheFile = calculateCacheFileName(blockNumber, cacheDir); + try (RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) { + final long offset = (blockNumber / BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH; + writer.seek(offset); + writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } catch (IOException e) { + LOG.error("Unhandled indexing exception.", e); } } - public void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos) + private void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos) throws IOException { - final byte[] logs = blockHeader.getLogsBloom().toArray(); + fos.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } + + private byte[] ensureBloomBitsAreCorrectLength(final byte[] logs) { checkNotNull(logs); - checkState(logs.length == 256, "BloomBits are not the correct length"); - fos.write(logs); + checkState(logs.length == BLOOM_BITS_LENGTH, "BloomBits are not the correct length"); + return logs; } public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) { @@ -164,6 +179,10 @@ public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) return indexingStatus; } + public EthScheduler getScheduler() { + return scheduler; + } + public static final class IndexingStatus { long startBlock; long endBlock; From 34fe368127f2f8375d7bd83473c72f4957801b96 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Thu, 6 Feb 2020 09:56:51 +0100 Subject: [PATCH 07/18] spotless apply Signed-off-by: Abdelhamid Bakhta --- .../ethereum/api/query/TransactionLogsIndexer.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index 5b7f0a434e4..28a00c85114 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -16,9 +16,10 @@ package org.hyperledger.besu.ethereum.api.query; -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -35,9 +36,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class TransactionLogsIndexer { From 3daa9a43d6e81d9234849228f77632bf07af75d7 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Fri, 7 Feb 2020 14:17:41 +0100 Subject: [PATCH 08/18] ensurePreviousSegmentsArePresent Signed-off-by: Abdelhamid Bakhta --- .../api/query/TransactionLogsIndexer.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index 28a00c85114..d1f68b08923 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -28,9 +28,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -46,6 +48,7 @@ public class TransactionLogsIndexer { public static final int BLOCKS_PER_BLOOM_CACHE = 100_000; private static final int BLOOM_BITS_LENGTH = 256; + private static final int CACHE_FILE_SIZE = BLOCKS_PER_BLOOM_CACHE * BLOOM_BITS_LENGTH; public static final String PENDING = "pending"; private final Lock submissionLock = new ReentrantLock(); @@ -134,6 +137,7 @@ private long fillCacheFile( public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { final long blockNumber = blockHeader.getNumber(); LOG.info("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); + ensurePreviousSegmentsArePresent(blockNumber); final File cacheFile = calculateCacheFileName(blockNumber, cacheDir); try (RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) { final long offset = (blockNumber / BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH; @@ -144,6 +148,31 @@ public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { } } + private void ensurePreviousSegmentsArePresent(final long blockNumber) { + scheduler.scheduleFutureTask( + () -> { + long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1; + while (currentSegment > 0) { + try { + if (!isCachePresentForSegment(currentSegment)) { + final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE; + generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE); + } + } catch (IOException e) { + LOG.error("Unhandled indexing exception.", e); + } finally { + currentSegment--; + } + } + }, + Duration.ofSeconds(1)); + } + + private boolean isCachePresentForSegment(final long segment) throws IOException { + final File cacheFile = calculateCacheFileName(Long.toString(segment), cacheDir); + return cacheFile.exists() && FileChannel.open(cacheFile.toPath()).size() == CACHE_FILE_SIZE; + } + private void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos) throws IOException { fos.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); From 34b1e50a19c446155ff2c4d6c89c64c60a8534ef Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Fri, 7 Feb 2020 17:55:26 +0100 Subject: [PATCH 09/18] Added CLI flag to enable / disable automatic logs bloom indexing. Signed-off-by: Abdelhamid Bakhta --- .../src/main/java/org/hyperledger/besu/RunnerBuilder.java | 8 +++++++- .../main/java/org/hyperledger/besu/cli/BesuCommand.java | 7 +++++++ .../java/org/hyperledger/besu/cli/BesuCommandTest.java | 1 + .../org/hyperledger/besu/cli/CommandTestAbstract.java | 1 + besu/src/test/resources/everything_config.toml | 4 +++- 5 files changed, 19 insertions(+), 2 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index 806985ef197..1ad899e07bc 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -142,6 +142,7 @@ public class RunnerBuilder { private Collection staticNodes = Collections.emptyList(); private Optional identityString = Optional.empty(); private BesuPluginContextImpl besuPluginContext; + private boolean autoLogsBloomIndexing = true; public RunnerBuilder vertx(final Vertx vertx) { this.vertx = vertx; @@ -268,6 +269,11 @@ public RunnerBuilder besuPluginContext(final BesuPluginContextImpl besuPluginCon return this; } + public RunnerBuilder autoLogsBloomIndexing(final boolean autoLogsBloomIndexing) { + this.autoLogsBloomIndexing = autoLogsBloomIndexing; + return this; + } + public Runner build() { Preconditions.checkNotNull(besuController); @@ -528,7 +534,7 @@ public Runner build() { besuController, dataDir, besuController.getProtocolManager().getBlockBroadcaster(), - blockchainQueries.getTransactionLogsIndexer()); + autoLogsBloomIndexing ? blockchainQueries.getTransactionLogsIndexer() : Optional.empty()); } private Optional buildNodePermissioningController( diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index fc57772a0db..1a11521e98b 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -790,6 +790,12 @@ void setBannedNodeIds(final List values) { arity = "1") private String keyValueStorageName = DEFAULT_KEY_VALUE_STORAGE_NAME; + @Option( + names = {"--auto-logs-bloom-indexing-enabled"}, + description = "Enable Automatic logs bloom indexing (default: ${DEFAULT-VALUE})", + arity = "1") + private final Boolean autoLogsBloomIndexingEnabled = true; + @Option( names = {"--override-genesis-config"}, paramLabel = "NAME=VALUE", @@ -1695,6 +1701,7 @@ private void synchronize( .staticNodes(staticNodes) .identityString(identityString) .besuPluginContext(besuPluginContext) + .autoLogsBloomIndexing(autoLogsBloomIndexingEnabled) .build(); addShutdownHook(runner); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java index 45128ef9751..d24d4ed2fc0 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/BesuCommandTest.java @@ -175,6 +175,7 @@ public void callingBesuCommandWithoutOptionsMustSyncWithDefaultValues() throws E verify(mockRunnerBuilder).webSocketConfiguration(eq(DEFAULT_WEB_SOCKET_CONFIGURATION)); verify(mockRunnerBuilder).metricsConfiguration(eq(DEFAULT_METRICS_CONFIGURATION)); verify(mockRunnerBuilder).ethNetworkConfig(ethNetworkArg.capture()); + verify(mockRunnerBuilder).autoLogsBloomIndexing(eq(true)); verify(mockRunnerBuilder).build(); verify(mockControllerBuilderFactory).fromEthNetworkConfig(ethNetworkArg.capture(), any()); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java index 9b4d6db7cb5..4d4f31ed8ee 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java @@ -220,6 +220,7 @@ public void initMocks() throws Exception { when(mockRunnerBuilder.staticNodes(any())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.identityString(any())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.besuPluginContext(any())).thenReturn(mockRunnerBuilder); + when(mockRunnerBuilder.autoLogsBloomIndexing(anyBoolean())).thenReturn(mockRunnerBuilder); when(mockRunnerBuilder.build()).thenReturn(mockRunner); when(storageService.getByName("rocksdb")).thenReturn(Optional.of(rocksDBStorageFactory)); diff --git a/besu/src/test/resources/everything_config.toml b/besu/src/test/resources/everything_config.toml index 56e60dc23b0..c0b19c30033 100644 --- a/besu/src/test/resources/everything_config.toml +++ b/besu/src/test/resources/everything_config.toml @@ -134,4 +134,6 @@ revert-reason-enabled=false key-value-storage="rocksdb" # Gas limit -target-gas-limit=8000000 \ No newline at end of file +target-gas-limit=8000000 + +auto-logs-bloom-indexing-enabled=true \ No newline at end of file From f545f708fc3cf30dd39b178427875ee11153e7a7 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Mon, 10 Feb 2020 10:11:46 +0100 Subject: [PATCH 10/18] Create cache directory and cache file if not exist. Signed-off-by: Abdelhamid Bakhta --- .../AutoTransactionLogsIndexingService.java | 29 +++++++++++++------ .../api/query/TransactionLogsIndexer.java | 26 ++++++++++++----- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java index 568d5c71fed..e2e0149c857 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -16,6 +16,9 @@ import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; import java.util.OptionalLong; @@ -36,15 +39,23 @@ public AutoTransactionLogsIndexingService( } public void start() { - LOG.info("Starting Auto transaction logs indexing service."); - subscriptionId = - OptionalLong.of( - blockBroadcaster.subscribePropagateNewBlocks( - (block, __) -> - transactionLogsIndexer.cacheLogsBloomForBlockHeader(block.getHeader()))); - transactionLogsIndexer - .getScheduler() - .scheduleFutureTask(transactionLogsIndexer::indexAll, Duration.ofMinutes(1)); + try { + LOG.info("Starting Auto transaction logs indexing service."); + final Path cacheDir = transactionLogsIndexer.getCacheDir(); + if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) { + Files.createDirectory(cacheDir); + } + subscriptionId = + OptionalLong.of( + blockBroadcaster.subscribePropagateNewBlocks( + (block, __) -> + transactionLogsIndexer.cacheLogsBloomForBlockHeader(block.getHeader()))); + transactionLogsIndexer + .getScheduler() + .scheduleFutureTask(transactionLogsIndexer::indexAll, Duration.ofMinutes(1)); + } catch (IOException e) { + LOG.error("Unhandled indexing exception.", e); + } } public void stop() { diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index d1f68b08923..d8a6dcce13d 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -55,6 +55,7 @@ public class TransactionLogsIndexer { private final EthScheduler scheduler; private final Blockchain blockchain; + private final Path cacheDir; private final IndexingStatus indexingStatus = new IndexingStatus(); @@ -135,14 +136,19 @@ private long fillCacheFile( } public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { - final long blockNumber = blockHeader.getNumber(); - LOG.info("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); - ensurePreviousSegmentsArePresent(blockNumber); - final File cacheFile = calculateCacheFileName(blockNumber, cacheDir); - try (RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) { - final long offset = (blockNumber / BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH; - writer.seek(offset); - writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + try { + final long blockNumber = blockHeader.getNumber(); + LOG.info("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); + ensurePreviousSegmentsArePresent(blockNumber); + final File cacheFile = calculateCacheFileName(blockNumber, cacheDir); + if (!cacheFile.exists()) { + Files.createFile(cacheFile.toPath()); + } + try (RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) { + final long offset = (blockNumber / BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH; + writer.seek(offset); + writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray())); + } } catch (IOException e) { LOG.error("Unhandled indexing exception.", e); } @@ -213,6 +219,10 @@ public EthScheduler getScheduler() { return scheduler; } + Path getCacheDir() { + return cacheDir; + } + public static final class IndexingStatus { long startBlock; long endBlock; From eb48fec2d60d52ae3f8244288450a73b5a5390bb Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Mon, 10 Feb 2020 10:42:03 +0100 Subject: [PATCH 11/18] Fix acceptance test Signed-off-by: Abdelhamid Bakhta --- .../besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java | 3 +++ .../besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java | 1 + 2 files changed, 4 insertions(+) diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java index e46a97c9aa3..7f137b4d453 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java @@ -250,6 +250,9 @@ public void startNode(final BesuNode node) { params.add("--key-value-storage"); params.add("rocksdb"); + params.add("--auto-logs-bloom-indexing-enabled"); + params.add("false"); + LOG.info("Creating besu process with params {}", params); final ProcessBuilder processBuilder = new ProcessBuilder(params) diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java index 68e0d3cbeab..a3fce047b3e 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java @@ -195,6 +195,7 @@ public void startNode(final BesuNode node) { .map(EnodeURL::fromString) .collect(Collectors.toList())) .besuPluginContext(new BesuPluginContextImpl()) + .autoLogsBloomIndexing(false) .build(); runner.start(); From ec42713e0ca788308ce3a2be7def86dea786f06e Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Tue, 11 Feb 2020 09:23:40 +0100 Subject: [PATCH 12/18] Write cache for block only if block is new canonical head. Signed-off-by: Abdelhamid Bakhta --- .../java/org/hyperledger/besu/Runner.java | 8 +++---- .../org/hyperledger/besu/RunnerBuilder.java | 4 ++-- .../AutoTransactionLogsIndexingService.java | 22 +++++++++++-------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/Runner.java b/besu/src/main/java/org/hyperledger/besu/Runner.java index 8856c079648..4b67f5ec61c 100644 --- a/besu/src/main/java/org/hyperledger/besu/Runner.java +++ b/besu/src/main/java/org/hyperledger/besu/Runner.java @@ -20,7 +20,7 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService; import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogsIndexingService; import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer; -import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner; import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL; import org.hyperledger.besu.ethereum.stratum.StratumServer; @@ -74,8 +74,8 @@ public class Runner implements AutoCloseable { final Optional metrics, final BesuController besuController, final Path dataDir, - final BlockBroadcaster blockBroadcaster, - final Optional transactionLogsIndexer) { + final Optional transactionLogsIndexer, + final Blockchain blockchain) { this.vertx = vertx; this.networkRunner = networkRunner; this.natService = natService; @@ -88,7 +88,7 @@ public class Runner implements AutoCloseable { this.stratumServer = stratumServer; this.autoTransactionLogsIndexingService = transactionLogsIndexer.map( - indexer -> new AutoTransactionLogsIndexingService(blockBroadcaster, indexer)); + indexer -> new AutoTransactionLogsIndexingService(blockchain, indexer)); } public void start() { diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index a2c768e32cd..dc0e1754976 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -534,8 +534,8 @@ public Runner build() { metricsService, besuController, dataDir, - besuController.getProtocolManager().getBlockBroadcaster(), - autoLogsBloomIndexing ? blockchainQueries.getTransactionLogsIndexer() : Optional.empty()); + autoLogsBloomIndexing ? blockchainQueries.getTransactionLogsIndexer() : Optional.empty(), + context.getBlockchain()); } private Optional buildNodePermissioningController( diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java index e2e0149c857..8a34922465d 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -14,7 +14,7 @@ */ package org.hyperledger.besu.ethereum.api.query; -import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; +import org.hyperledger.besu.ethereum.chain.Blockchain; import java.io.IOException; import java.nio.file.Files; @@ -27,14 +27,13 @@ public class AutoTransactionLogsIndexingService { protected static final Logger LOG = LogManager.getLogger(); - private final BlockBroadcaster blockBroadcaster; + private final Blockchain blockchain; private final TransactionLogsIndexer transactionLogsIndexer; private OptionalLong subscriptionId = OptionalLong.empty(); public AutoTransactionLogsIndexingService( - final BlockBroadcaster blockBroadcaster, - final TransactionLogsIndexer transactionLogsIndexer) { - this.blockBroadcaster = blockBroadcaster; + final Blockchain blockchain, final TransactionLogsIndexer transactionLogsIndexer) { + this.blockchain = blockchain; this.transactionLogsIndexer = transactionLogsIndexer; } @@ -47,9 +46,14 @@ public void start() { } subscriptionId = OptionalLong.of( - blockBroadcaster.subscribePropagateNewBlocks( - (block, __) -> - transactionLogsIndexer.cacheLogsBloomForBlockHeader(block.getHeader()))); + blockchain.observeBlockAdded( + (event, __) -> { + if (event.isNewCanonicalHead()) { + transactionLogsIndexer.cacheLogsBloomForBlockHeader( + event.getBlock().getHeader()); + } + })); + transactionLogsIndexer .getScheduler() .scheduleFutureTask(transactionLogsIndexer::indexAll, Duration.ofMinutes(1)); @@ -60,6 +64,6 @@ public void start() { public void stop() { LOG.info("Shutting down Auto transaction logs indexing service."); - subscriptionId.ifPresent(blockBroadcaster::unsubscribePropagateNewBlocks); + subscriptionId.ifPresent(blockchain::removeObserver); } } From 18f1a22fe033a608f3ad99fa365752b42c7bd296 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 12 Feb 2020 09:42:17 +0100 Subject: [PATCH 13/18] Handling of chain reorg. Signed-off-by: Abdelhamid Bakhta --- .../AutoTransactionLogsIndexingService.java | 12 +++++++--- .../besu/ethereum/chain/Blockchain.java | 18 +++++++++++++++ .../ethereum/chain/ChainReorgObserver.java | 22 +++++++++++++++++++ .../ethereum/chain/DefaultBlockchain.java | 18 ++++++++++++++- .../besu/ethereum/vm/TestBlockchain.java | 11 ++++++++++ 5 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java index 8a34922465d..b1d71cf0890 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/AutoTransactionLogsIndexingService.java @@ -29,7 +29,8 @@ public class AutoTransactionLogsIndexingService { protected static final Logger LOG = LogManager.getLogger(); private final Blockchain blockchain; private final TransactionLogsIndexer transactionLogsIndexer; - private OptionalLong subscriptionId = OptionalLong.empty(); + private OptionalLong blockAddedSubscriptionId = OptionalLong.empty(); + private OptionalLong chainReorgSubscriptionId = OptionalLong.empty(); public AutoTransactionLogsIndexingService( final Blockchain blockchain, final TransactionLogsIndexer transactionLogsIndexer) { @@ -44,7 +45,7 @@ public void start() { if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) { Files.createDirectory(cacheDir); } - subscriptionId = + blockAddedSubscriptionId = OptionalLong.of( blockchain.observeBlockAdded( (event, __) -> { @@ -53,6 +54,10 @@ public void start() { event.getBlock().getHeader()); } })); + chainReorgSubscriptionId = + OptionalLong.of( + blockchain.observeChainReorg( + (header, __) -> transactionLogsIndexer.cacheLogsBloomForBlockHeader(header))); transactionLogsIndexer .getScheduler() @@ -64,6 +69,7 @@ public void start() { public void stop() { LOG.info("Shutting down Auto transaction logs indexing service."); - subscriptionId.ifPresent(blockchain::removeObserver); + blockAddedSubscriptionId.ifPresent(blockchain::removeObserver); + chainReorgSubscriptionId.ifPresent(blockchain::removeChainReorgObserver); } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java index 5e9e59816fa..f0b7a157203 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java @@ -206,4 +206,22 @@ default long observeLogs(final Consumer logObserver) { * @return {@code true} if the observer was removed; otherwise {@code false} */ boolean removeObserver(long observerId); + + /** + * Adds an observer that will get called when a new block is added after reorg. + * + *

No guarantees are made about the order in which observers are invoked. + * + * @param observer the observer to call + * @return the observer ID that can be used to remove it later. + */ + long observeChainReorg(ChainReorgObserver observer); + + /** + * Removes a previously added {@link ChainReorgObserver}. + * + * @param observerId the ID of the observer to remove + * @return {@code true} if the observer was removed; otherwise {@code false} + */ + boolean removeChainReorgObserver(long observerId); } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java new file mode 100644 index 00000000000..aeed1411295 --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainReorgObserver.java @@ -0,0 +1,22 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.chain; + +import org.hyperledger.besu.ethereum.core.BlockHeader; + +public interface ChainReorgObserver { + + void onBlockAdded(BlockHeader blockHeader, Blockchain blockchain); +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java index e64c03f2200..0602882c532 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/DefaultBlockchain.java @@ -53,6 +53,7 @@ public class DefaultBlockchain implements MutableBlockchain { protected final BlockchainStorage blockchainStorage; private final Subscribers blockAddedObservers = Subscribers.create(); + private final Subscribers blockReorgObservers = Subscribers.create(); private volatile BlockHeader chainHeader; private volatile Difficulty totalDifficulty; @@ -344,7 +345,7 @@ private BlockAddedEvent handleChainReorg( newTransactions.put( blockHash, currentNewChainWithReceipts.getBlock().getBody().getTransactions()); addAddedLogsWithMetadata(addedLogsWithMetadata, currentNewChainWithReceipts); - + notifyChainReorgBlockAdded(currentNewChainWithReceipts.getHeader()); currentNewChainWithReceipts = getParentBlockWithReceipts(currentNewChainWithReceipts); } @@ -547,6 +548,17 @@ public boolean removeObserver(final long observerId) { return blockAddedObservers.unsubscribe(observerId); } + @Override + public long observeChainReorg(final ChainReorgObserver observer) { + checkNotNull(observer); + return blockReorgObservers.subscribe(observer); + } + + @Override + public boolean removeChainReorgObserver(final long observerId) { + return blockReorgObservers.unsubscribe(observerId); + } + @VisibleForTesting int observerCount() { return blockAddedObservers.getSubscriberCount(); @@ -555,4 +567,8 @@ int observerCount() { private void notifyBlockAdded(final BlockAddedEvent event) { blockAddedObservers.forEach(observer -> observer.onBlockAdded(event, this)); } + + private void notifyChainReorgBlockAdded(final BlockHeader blockHeader) { + blockReorgObservers.forEach(observer -> observer.onBlockAdded(blockHeader, this)); + } } diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java index 4e9991549c5..5d0b6ac56f4 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java @@ -19,6 +19,7 @@ import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.ChainHead; +import org.hyperledger.besu.ethereum.chain.ChainReorgObserver; import org.hyperledger.besu.ethereum.chain.TransactionLocation; import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -149,6 +150,16 @@ public boolean removeObserver(final long observerId) { throw new NonDeterministicOperationException("Listening for new blocks is not deterministic"); } + @Override + public long observeChainReorg(ChainReorgObserver observer) { + throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); + } + + @Override + public boolean removeChainReorgObserver(long observerId) { + throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); + } + public static class NonDeterministicOperationException extends RuntimeException { public NonDeterministicOperationException(final String message) { super(message); From 84614213fdc59bca9fb0e2cc1a825de745f27a8d Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 12 Feb 2020 09:47:25 +0100 Subject: [PATCH 14/18] fix Signed-off-by: Abdelhamid Bakhta --- .../org/hyperledger/besu/ethereum/vm/TestBlockchain.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java index 5d0b6ac56f4..9e17ad87111 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java @@ -14,8 +14,7 @@ */ package org.hyperledger.besu.ethereum.vm; -import static java.nio.charset.StandardCharsets.UTF_8; - +import org.apache.tuweni.bytes.Bytes; import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.ChainHead; @@ -34,7 +33,7 @@ import java.util.Map; import java.util.Optional; -import org.apache.tuweni.bytes.Bytes; +import static java.nio.charset.StandardCharsets.UTF_8; /** * A blockchain mock for the Ethereum reference tests. @@ -151,12 +150,12 @@ public boolean removeObserver(final long observerId) { } @Override - public long observeChainReorg(ChainReorgObserver observer) { + public long observeChainReorg(final ChainReorgObserver observer) { throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); } @Override - public boolean removeChainReorgObserver(long observerId) { + public boolean removeChainReorgObserver(final long observerId) { throw new NonDeterministicOperationException("Listening for chain reorg is not deterministic"); } From bc65af99c4eddf0478a27ed23d5d1e306253c40b Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Wed, 12 Feb 2020 09:53:11 +0100 Subject: [PATCH 15/18] sportless apply Signed-off-by: Abdelhamid Bakhta --- .../org/hyperledger/besu/ethereum/vm/TestBlockchain.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java index 9e17ad87111..9f1790fea81 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/vm/TestBlockchain.java @@ -14,7 +14,8 @@ */ package org.hyperledger.besu.ethereum.vm; -import org.apache.tuweni.bytes.Bytes; +import static java.nio.charset.StandardCharsets.UTF_8; + import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.ChainHead; @@ -33,7 +34,7 @@ import java.util.Map; import java.util.Optional; -import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.tuweni.bytes.Bytes; /** * A blockchain mock for the Ethereum reference tests. From d8d2a711c90d3fb24e3ec40f0cedc00a528c4fac Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Thu, 13 Feb 2020 09:17:08 +0100 Subject: [PATCH 16/18] Address PR comments. Signed-off-by: Abdelhamid Bakhta --- .../api/query/TransactionLogsIndexer.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index d8a6dcce13d..abf8a534744 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -28,12 +28,13 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.time.Duration; +import java.util.Map; import java.util.Optional; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,6 +51,7 @@ public class TransactionLogsIndexer { private static final int BLOOM_BITS_LENGTH = 256; private static final int CACHE_FILE_SIZE = BLOCKS_PER_BLOOM_CACHE * BLOOM_BITS_LENGTH; public static final String PENDING = "pending"; + private final Map cachedSegments; private final Lock submissionLock = new ReentrantLock(); @@ -65,10 +67,11 @@ public TransactionLogsIndexer( this.blockchain = blockchain; this.cacheDir = cacheDir; this.scheduler = scheduler; + this.cachedSegments = new TreeMap<>(); } - public IndexingStatus indexAll() { - return generateLogBloomCache(0, Long.MAX_VALUE); + public void indexAll() { + ensurePreviousSegmentsArePresent(blockchain.getChainHeadBlockNumber()); } private static File calculateCacheFileName(final String name, final Path cacheDir) { @@ -138,7 +141,7 @@ private long fillCacheFile( public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { try { final long blockNumber = blockHeader.getNumber(); - LOG.info("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); + LOG.debug("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber)); ensurePreviousSegmentsArePresent(blockNumber); final File cacheFile = calculateCacheFileName(blockNumber, cacheDir); if (!cacheFile.exists()) { @@ -155,28 +158,24 @@ public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) { } private void ensurePreviousSegmentsArePresent(final long blockNumber) { - scheduler.scheduleFutureTask( - () -> { - long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1; - while (currentSegment > 0) { - try { - if (!isCachePresentForSegment(currentSegment)) { - final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE; - generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE); + if (!indexingStatus.isIndexing()) { + scheduler.scheduleFutureTask( + () -> { + long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1; + while (currentSegment > 0) { + try { + if (!cachedSegments.getOrDefault(currentSegment, false)) { + final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE; + generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE); + cachedSegments.put(currentSegment, true); + } + } finally { + currentSegment--; } - } catch (IOException e) { - LOG.error("Unhandled indexing exception.", e); - } finally { - currentSegment--; } - } - }, - Duration.ofSeconds(1)); - } - - private boolean isCachePresentForSegment(final long segment) throws IOException { - final File cacheFile = calculateCacheFileName(Long.toString(segment), cacheDir); - return cacheFile.exists() && FileChannel.open(cacheFile.toPath()).size() == CACHE_FILE_SIZE; + }, + Duration.ofSeconds(1)); + } } private void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos) From eb1facf87fd34badf59e09792716ac255a099f5e Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Thu, 13 Feb 2020 09:20:12 +0100 Subject: [PATCH 17/18] Remove unused constant. Signed-off-by: Abdelhamid Bakhta --- .../ethereum/api/query/TransactionLogsIndexer.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index abf8a534744..bb16399a71e 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -16,10 +16,9 @@ package org.hyperledger.besu.ethereum.api.query; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -39,9 +38,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; public class TransactionLogsIndexer { @@ -49,7 +48,6 @@ public class TransactionLogsIndexer { public static final int BLOCKS_PER_BLOOM_CACHE = 100_000; private static final int BLOOM_BITS_LENGTH = 256; - private static final int CACHE_FILE_SIZE = BLOCKS_PER_BLOOM_CACHE * BLOOM_BITS_LENGTH; public static final String PENDING = "pending"; private final Map cachedSegments; From 275ab563f17d4c331db26c96eab589b16dffd120 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Thu, 13 Feb 2020 09:24:44 +0100 Subject: [PATCH 18/18] spotless apply Signed-off-by: Abdelhamid Bakhta --- .../ethereum/api/query/TransactionLogsIndexer.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java index bb16399a71e..ad7ed9d9da2 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/TransactionLogsIndexer.java @@ -16,9 +16,10 @@ package org.hyperledger.besu.ethereum.api.query; -import com.fasterxml.jackson.annotation.JsonGetter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; @@ -38,9 +39,9 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class TransactionLogsIndexer {