Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6963071
First iteration. Draft PR.
AbdelStark Feb 5, 2020
ca62e34
fix SPDX header
AbdelStark Feb 5, 2020
388d0dc
Use block broadcaster to index log bloom.
AbdelStark Feb 5, 2020
94fccd4
Remove useless toString method
AbdelStark Feb 5, 2020
997c333
spotless apply
AbdelStark Feb 5, 2020
3092ddc
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 5, 2020
a29dcee
cacheLogsBloomForBlockHeader
AbdelStark Feb 6, 2020
34fe368
spotless apply
AbdelStark Feb 6, 2020
3daa9a4
ensurePreviousSegmentsArePresent
AbdelStark Feb 7, 2020
743448a
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 7, 2020
d7fbfdf
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 7, 2020
34b1e50
Added CLI flag to enable / disable automatic logs bloom indexing.
AbdelStark Feb 7, 2020
026d53f
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 10, 2020
f545f70
Create cache directory and cache file if not exist.
AbdelStark Feb 10, 2020
eb48fec
Fix acceptance test
AbdelStark Feb 10, 2020
4c7a20c
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 11, 2020
ec42713
Write cache for block only if block is new canonical head.
AbdelStark Feb 11, 2020
18f1a22
Handling of chain reorg.
AbdelStark Feb 12, 2020
8461421
fix
AbdelStark Feb 12, 2020
bc65af9
sportless apply
AbdelStark Feb 12, 2020
8a3c988
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 12, 2020
cd468ec
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 12, 2020
2cd0985
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 12, 2020
3d6efc3
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 12, 2020
393dddb
Merge remote-tracking branch 'upstream/master' into besu-169/auto-log…
AbdelStark Feb 13, 2020
d8d2a71
Address PR comments.
AbdelStark Feb 13, 2020
eb1facf
Remove unused constant.
AbdelStark Feb 13, 2020
275ab56
spotless apply
AbdelStark Feb 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ public void startNode(final BesuNode node) {
params.add("--key-value-storage");
params.add("rocksdb");

params.add("--auto-logs-bloom-indexing-enabled");
Copy link
Copy Markdown
Contributor

@shemnon shemnon Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
params.add("--auto-logs-bloom-indexing-enabled");
params.add("--auto-log-bloom-caching-enabled");

params.add("false");

LOG.info("Creating besu process with params {}", params);
final ProcessBuilder processBuilder =
new ProcessBuilder(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public void startNode(final BesuNode node) {
.map(EnodeURL::fromString)
.collect(Collectors.toList()))
.besuPluginContext(new BesuPluginContextImpl())
.autoLogsBloomIndexing(false)
Copy link
Copy Markdown
Contributor

@shemnon shemnon Feb 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.autoLogsBloomIndexing(false)
.autoLogBloomCaching(false)

.build();

runner.start();
Expand Down
14 changes: 12 additions & 2 deletions besu/src/main/java/org/hyperledger/besu/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.hyperledger.besu.ethereum.api.graphql.GraphQLHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcHttpService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.query.AutoTransactionLogsIndexingService;
import org.hyperledger.besu.ethereum.api.query.TransactionLogsIndexer;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.p2p.network.NetworkRunner;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.stratum.StratumServer;
Expand Down Expand Up @@ -58,6 +61,7 @@ public class Runner implements AutoCloseable {
private final BesuController<?> besuController;
private final Path dataDir;
private final Optional<StratumServer> stratumServer;
private final Optional<AutoTransactionLogsIndexingService> autoTransactionLogsIndexingService;

Runner(
final Vertx vertx,
Expand All @@ -69,7 +73,9 @@ public class Runner implements AutoCloseable {
final Optional<StratumServer> stratumServer,
final Optional<MetricsService> metrics,
final BesuController<?> besuController,
final Path dataDir) {
final Path dataDir,
final Optional<TransactionLogsIndexer> transactionLogsIndexer,
final Blockchain blockchain) {
this.vertx = vertx;
this.networkRunner = networkRunner;
this.natService = natService;
Expand All @@ -80,6 +86,9 @@ public class Runner implements AutoCloseable {
this.besuController = besuController;
this.dataDir = dataDir;
this.stratumServer = stratumServer;
this.autoTransactionLogsIndexingService =
transactionLogsIndexer.map(
indexer -> new AutoTransactionLogsIndexingService(blockchain, indexer));
}

public void start() {
Expand All @@ -103,6 +112,7 @@ public void start() {
LOG.info("Ethereum main loop is up.");
writeBesuPortsToFile();
writeBesuNetworksToFile();
autoTransactionLogsIndexingService.ifPresent(AutoTransactionLogsIndexingService::start);
} catch (final Exception ex) {
LOG.error("Startup failed", ex);
throw new IllegalStateException(ex);
Expand All @@ -125,7 +135,7 @@ public void stop() {

networkRunner.stop();
waitForServiceToStop("Network", networkRunner::awaitStop);

autoTransactionLogsIndexingService.ifPresent(AutoTransactionLogsIndexingService::stop);
natService.stop();
besuController.close();
vertx.close((res) -> vertxShutdownLatch.countDown());
Expand Down
10 changes: 9 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class RunnerBuilder {
private Collection<EnodeURL> staticNodes = Collections.emptyList();
private Optional<String> identityString = Optional.empty();
private BesuPluginContextImpl besuPluginContext;
private boolean autoLogsBloomIndexing = true;

public RunnerBuilder vertx(final Vertx vertx) {
this.vertx = vertx;
Expand Down Expand Up @@ -270,6 +271,11 @@ public RunnerBuilder besuPluginContext(final BesuPluginContextImpl besuPluginCon
return this;
}

public RunnerBuilder autoLogsBloomIndexing(final boolean autoLogsBloomIndexing) {
this.autoLogsBloomIndexing = autoLogsBloomIndexing;
return this;
}

public Runner build() {

Preconditions.checkNotNull(besuController);
Expand Down Expand Up @@ -527,7 +533,9 @@ public Runner build() {
stratumServer,
metricsService,
besuController,
dataDir);
dataDir,
autoLogsBloomIndexing ? blockchainQueries.getTransactionLogsIndexer() : Optional.empty(),
context.getBlockchain());
}

private Optional<NodePermissioningController> buildNodePermissioningController(
Expand Down
7 changes: 7 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,12 @@ void setBannedNodeIds(final List<String> values) {
arity = "1")
private String keyValueStorageName = DEFAULT_KEY_VALUE_STORAGE_NAME;

@Option(
names = {"--auto-logs-bloom-indexing-enabled"},
description = "Enable Automatic logs bloom indexing (default: ${DEFAULT-VALUE})",
arity = "1")
private final Boolean autoLogsBloomIndexingEnabled = true;

@Option(
names = {"--override-genesis-config"},
paramLabel = "NAME=VALUE",
Expand Down Expand Up @@ -1695,6 +1701,7 @@ private void synchronize(
.staticNodes(staticNodes)
.identityString(identityString)
.besuPluginContext(besuPluginContext)
.autoLogsBloomIndexing(autoLogsBloomIndexingEnabled)
.build();

addShutdownHook(runner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void callingBesuCommandWithoutOptionsMustSyncWithDefaultValues() throws E
verify(mockRunnerBuilder).webSocketConfiguration(eq(DEFAULT_WEB_SOCKET_CONFIGURATION));
verify(mockRunnerBuilder).metricsConfiguration(eq(DEFAULT_METRICS_CONFIGURATION));
verify(mockRunnerBuilder).ethNetworkConfig(ethNetworkArg.capture());
verify(mockRunnerBuilder).autoLogsBloomIndexing(eq(true));
verify(mockRunnerBuilder).build();

verify(mockControllerBuilderFactory).fromEthNetworkConfig(ethNetworkArg.capture(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public void initMocks() throws Exception {
when(mockRunnerBuilder.staticNodes(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.identityString(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.besuPluginContext(any())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.autoLogsBloomIndexing(anyBoolean())).thenReturn(mockRunnerBuilder);
when(mockRunnerBuilder.build()).thenReturn(mockRunner);

when(storageService.getByName("rocksdb")).thenReturn(Optional.of(rocksDBStorageFactory));
Expand Down
4 changes: 3 additions & 1 deletion besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,6 @@ revert-reason-enabled=false
key-value-storage="rocksdb"

# Gas limit
target-gas-limit=8000000
target-gas-limit=8000000

auto-logs-bloom-indexing-enabled=true
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.query;

import org.hyperledger.besu.ethereum.chain.Blockchain;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.OptionalLong;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class AutoTransactionLogsIndexingService {
protected static final Logger LOG = LogManager.getLogger();
private final Blockchain blockchain;
private final TransactionLogsIndexer transactionLogsIndexer;
private OptionalLong blockAddedSubscriptionId = OptionalLong.empty();
private OptionalLong chainReorgSubscriptionId = OptionalLong.empty();

public AutoTransactionLogsIndexingService(
final Blockchain blockchain, final TransactionLogsIndexer transactionLogsIndexer) {
this.blockchain = blockchain;
this.transactionLogsIndexer = transactionLogsIndexer;
}

public void start() {
try {
LOG.info("Starting Auto transaction logs indexing service.");
final Path cacheDir = transactionLogsIndexer.getCacheDir();
if (!cacheDir.toFile().exists() || !cacheDir.toFile().isDirectory()) {
Files.createDirectory(cacheDir);
}
blockAddedSubscriptionId =
OptionalLong.of(
blockchain.observeBlockAdded(
(event, __) -> {
if (event.isNewCanonicalHead()) {
transactionLogsIndexer.cacheLogsBloomForBlockHeader(
event.getBlock().getHeader());
}
}));
chainReorgSubscriptionId =
OptionalLong.of(
blockchain.observeChainReorg(
(header, __) -> transactionLogsIndexer.cacheLogsBloomForBlockHeader(header)));

transactionLogsIndexer
.getScheduler()
.scheduleFutureTask(transactionLogsIndexer::indexAll, Duration.ofMinutes(1));
} catch (IOException e) {
LOG.error("Unhandled indexing exception.", e);
}
}

public void stop() {
LOG.info("Shutting down Auto transaction logs indexing service.");
blockAddedSubscriptionId.ifPresent(blockchain::removeObserver);
chainReorgSubscriptionId.ifPresent(blockchain::removeChainReorgObserver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -44,11 +48,15 @@ public class TransactionLogsIndexer {
private static final Logger LOG = LogManager.getLogger();

public static final int BLOCKS_PER_BLOOM_CACHE = 100_000;
private static final int BLOOM_BITS_LENGTH = 256;
public static final String PENDING = "pending";
private final Map<Long, Boolean> cachedSegments;

private final Lock submissionLock = new ReentrantLock();

private final EthScheduler scheduler;
private final Blockchain blockchain;

private final Path cacheDir;

private final IndexingStatus indexingStatus = new IndexingStatus();
Expand All @@ -58,6 +66,11 @@ public TransactionLogsIndexer(
this.blockchain = blockchain;
this.cacheDir = cacheDir;
this.scheduler = scheduler;
this.cachedSegments = new TreeMap<>();
}

public void indexAll() {
ensurePreviousSegmentsArePresent(blockchain.getChainHeadBlockNumber());
}

private static File calculateCacheFileName(final String name, final Path cacheDir) {
Expand Down Expand Up @@ -117,16 +130,64 @@ private long fillCacheFile(
if (maybeHeader.isEmpty()) {
break;
}
final byte[] logs = maybeHeader.get().getLogsBloom().toArray();
checkNotNull(logs);
checkState(logs.length == 256, "BloomBits are not the correct length");
fos.write(logs);
fillCacheFileWithBlock(maybeHeader.get(), fos);
indexingStatus.currentBlock = blockNum;
blockNum++;
}
return blockNum - startBlock;
}

public void cacheLogsBloomForBlockHeader(final BlockHeader blockHeader) {
try {
final long blockNumber = blockHeader.getNumber();
LOG.debug("Caching logs bloom for block {}.", "0x" + Long.toHexString(blockNumber));
ensurePreviousSegmentsArePresent(blockNumber);
final File cacheFile = calculateCacheFileName(blockNumber, cacheDir);
if (!cacheFile.exists()) {
Files.createFile(cacheFile.toPath());
}
try (RandomAccessFile writer = new RandomAccessFile(cacheFile, "rw")) {
final long offset = (blockNumber / BLOCKS_PER_BLOOM_CACHE) * BLOOM_BITS_LENGTH;
writer.seek(offset);
writer.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray()));
}
} catch (IOException e) {
LOG.error("Unhandled indexing exception.", e);
}
}

private void ensurePreviousSegmentsArePresent(final long blockNumber) {
if (!indexingStatus.isIndexing()) {
scheduler.scheduleFutureTask(
() -> {
long currentSegment = (blockNumber / BLOCKS_PER_BLOOM_CACHE) - 1;
while (currentSegment > 0) {
try {
if (!cachedSegments.getOrDefault(currentSegment, false)) {
final long startBlock = currentSegment * BLOCKS_PER_BLOOM_CACHE;
generateLogBloomCache(startBlock, startBlock + BLOCKS_PER_BLOOM_CACHE);
cachedSegments.put(currentSegment, true);
}
} finally {
currentSegment--;
}
}
},
Duration.ofSeconds(1));
}
}

private void fillCacheFileWithBlock(final BlockHeader blockHeader, final FileOutputStream fos)
throws IOException {
fos.write(ensureBloomBitsAreCorrectLength(blockHeader.getLogsBloom().toArray()));
}

private byte[] ensureBloomBitsAreCorrectLength(final byte[] logs) {
checkNotNull(logs);
checkState(logs.length == BLOOM_BITS_LENGTH, "BloomBits are not the correct length");
return logs;
}

public IndexingStatus requestIndexing(final long fromBlock, final long toBlock) {
boolean requestAccepted = false;
try {
Expand All @@ -152,6 +213,14 @@ public IndexingStatus requestIndexing(final long fromBlock, final long toBlock)
return indexingStatus;
}

public EthScheduler getScheduler() {
return scheduler;
}

Path getCacheDir() {
return cacheDir;
}

public static final class IndexingStatus {
long startBlock;
long endBlock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,22 @@ default long observeLogs(final Consumer<LogWithMetadata> logObserver) {
* @return {@code true} if the observer was removed; otherwise {@code false}
*/
boolean removeObserver(long observerId);

/**
* Adds an observer that will get called when a new block is added after reorg.
*
* <p><i>No guarantees are made about the order in which observers are invoked.</i>
*
* @param observer the observer to call
* @return the observer ID that can be used to remove it later.
*/
long observeChainReorg(ChainReorgObserver observer);

/**
* Removes a previously added {@link ChainReorgObserver}.
*
* @param observerId the ID of the observer to remove
* @return {@code true} if the observer was removed; otherwise {@code false}
*/
boolean removeChainReorgObserver(long observerId);
}
Loading