Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ allprojects {
}

// Below this line are currently only license header tasks
format 'groovy', {
target '**/src/*/grovy/**/*.groovy'
}
format 'groovy', { target '**/src/*/grovy/**/*.groovy' }

// Currently disabled due to referencetest issues
// format 'bash', {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.worldstate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain;

import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.core.WorldState;
import org.hyperledger.besu.ethereum.rlp.RLP;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.MockExecutorService;
import org.hyperledger.besu.util.bytes.Bytes32;
import org.hyperledger.besu.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.junit.Test;

public class PrunerIntegrationTest {

private final BlockDataGenerator gen = new BlockDataGenerator();
private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem();
private final Map<BytesValue, byte[]> hashValueStore = new HashMap<>();
private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore);
private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage);
private final WorldStateArchive worldStateArchive =
new WorldStateArchive(
worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage()));
private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage();
private final Block genesisBlock = gen.genesisBlock();
private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock);

@Test
public void pruner_smallState_manyOpsPerTx() throws InterruptedException {
testPruner(3, 1, 1, 4, 1000);
}

@Test
public void pruner_largeState_fewOpsPerTx() throws InterruptedException {
testPruner(2, 5, 5, 6, 5);
}

@Test
public void pruner_emptyBlocks() throws InterruptedException {
testPruner(5, 0, 2, 5, 10);
}

@Test
public void pruner_markChainhead() throws InterruptedException {
testPruner(4, 2, 1, 10, 20);
}

@Test
public void pruner_lowRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 1, 4, 20);
}

@Test
public void pruner_highRelativeBlockConfirmations() throws InterruptedException {
testPruner(3, 2, 9, 10, 20);
}

private void testPruner(
final int numCycles,
final int accountsPerBlock,
final long blockConfirmations,
final int numBlocksToKeep,
final int opsPerTransaction)
throws InterruptedException {

final var markSweepPruner =
new MarkSweepPruner(
worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction);
final var pruner =
new Pruner(
markSweepPruner,
blockchain,
new MockExecutorService(),
new PruningConfiguration(blockConfirmations, numBlocksToKeep));

pruner.start();

for (int cycle = 0; cycle < numCycles; ++cycle) {
int numBlockInCycle =
numBlocksToKeep
+ 1; // +1 to get it to switch from MARKING_COMPLETE TO SWEEPING on each cycle
var fullyMarkedBlockNum = cycle * numBlockInCycle + 1;

// This should cause a full mark and sweep cycle
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);
generateBlockchainData(numBlockInCycle, accountsPerBlock);
assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE);

// Collect the nodes we expect to keep
final Set<BytesValue> expectedNodes = new HashSet<>();
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
collectWorldStateNodes(stateRoot, expectedNodes);
}

if (accountsPerBlock != 0) {
assertThat(hashValueStore.size())
.isGreaterThanOrEqualTo(expectedNodes.size()); // Sanity check
}

// Assert that blocks from mark point onward are still accessible
for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) {
final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot();
assertThat(worldStateArchive.get(stateRoot)).isPresent();
final WorldState markedState = worldStateArchive.get(stateRoot).get();
// Traverse accounts and make sure all are accessible
final int expectedAccounts = accountsPerBlock * i;
final long accounts =
markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count();
assertThat(accounts).isEqualTo(expectedAccounts);
// Traverse storage to ensure that all storage is accessible
markedState
.streamAccounts(Bytes32.ZERO, expectedAccounts * 2)
.forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000));
}

// All other state roots should have been removed
for (int i = 0; i < fullyMarkedBlockNum; i++) {
final BlockHeader curHeader = blockchain.getBlockHeader(i).get();
if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) {
assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty();
}
}

// Check that storage contains only the values we expect
assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size());
assertThat(hashValueStore.values())
.containsExactlyInAnyOrderElementsOf(
expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet()));
}

pruner.stop();
}

private void generateBlockchainData(final int numBlocks, final int numAccounts) {
Block parentBlock = blockchain.getChainHeadBlock();
for (int i = 0; i < numBlocks; i++) {
final MutableWorldState worldState =
worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get();
gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts);
final Hash stateRoot = worldState.rootHash();

final Block block =
gen.block(
BlockOptions.create()
.setStateRoot(stateRoot)
.setBlockNumber(parentBlock.getHeader().getNumber() + 1L)
.setParentHash(parentBlock.getHash()));
final List<TransactionReceipt> receipts = gen.receipts(block);
blockchain.appendBlock(block, receipts);
parentBlock = block;
}
}

private Set<BytesValue> collectWorldStateNodes(
final Hash stateRootHash, final Set<BytesValue> collector) {
final List<Hash> storageRoots = new ArrayList<>();
final MerklePatriciaTrie<Bytes32, BytesValue> stateTrie = createStateTrie(stateRootHash);

// Collect storage roots and code
stateTrie
.entriesFrom(Bytes32.ZERO, 1000)
.forEach(
(key, val) -> {
final StateTrieAccountValue accountValue =
StateTrieAccountValue.readFrom(RLP.input(val));
stateStorage
.get(accountValue.getCodeHash().getArrayUnsafe())
.ifPresent(v -> collector.add(BytesValue.wrap(v)));
storageRoots.add(accountValue.getStorageRoot());
});

// Collect state nodes
collectTrieNodes(stateTrie, collector);
// Collect storage nodes
for (Hash storageRoot : storageRoots) {
final MerklePatriciaTrie<Bytes32, BytesValue> storageTrie = createStorageTrie(storageRoot);
collectTrieNodes(storageTrie, collector);
}

return collector;
}

private void collectTrieNodes(
final MerklePatriciaTrie<Bytes32, BytesValue> trie, final Set<BytesValue> collector) {
final Bytes32 rootHash = trie.getRootHash();
trie.visitAll(
(node) -> {
if (node.isReferencedByHash() || node.getHash().equals(rootHash)) {
collector.add(node.getRlp());
}
});
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStateTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStorageTrie(final Bytes32 rootHash) {
return new StoredMerklePatriciaTrie<>(
worldStateStorage::getAccountStorageTrieNode,
rootHash,
Function.identity(),
Function.identity());
}

// Proxy class so that we have access to the constructor that takes our own map
private static class TestInMemoryStorage extends InMemoryKeyValueStorage {

public TestInMemoryStorage(final Map<BytesValue, byte[]> hashValueStore) {
super(hashValueStore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class MarkSweepPruner {
private final Counter sweptNodesCounter;
private volatile long nodeAddedListenerId;
private final ReentrantLock markLock = new ReentrantLock(true);
private final Set<BytesValue> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Bytes32> pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>());

public MarkSweepPruner(
final WorldStateStorage worldStateStorage,
Expand Down Expand Up @@ -98,7 +98,9 @@ public MarkSweepPruner(

public void prepare() {
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case.
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes);
markStorage.clear();
pendingMarks.clear();
nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes);
}

public void cleanup() {
Expand All @@ -107,7 +109,6 @@ public void cleanup() {

public void mark(final Hash rootHash) {
markOperationCounter.inc();
markStorage.clear();
createStateTrie(rootHash)
.visitAll(
node -> {
Expand All @@ -119,13 +120,12 @@ public void mark(final Hash rootHash) {
markNode(node.getHash());
node.getValue().ifPresent(this::processAccountState);
});
LOG.info("Completed marking used nodes for pruning");
LOG.debug("Completed marking used nodes for pruning");
}

public void sweepBefore(final long markedBlockNumber) {
flushPendingMarks();
sweepOperationCounter.inc();
LOG.info("Sweeping unused nodes");
LOG.debug("Sweeping unused nodes");
// Sweep state roots first, walking backwards until we get to a state root that isn't in the
// storage
long prunedNodeCount = 0;
Expand All @@ -138,7 +138,7 @@ public void sweepBefore(final long markedBlockNumber) {
break;
}

if (!markStorage.containsKey(candidateStateRootHash.getArrayUnsafe())) {
if (!isMarked(candidateStateRootHash)) {
updater.removeAccountStateTrieNode(candidateStateRootHash);
prunedNodeCount++;
if (prunedNodeCount % operationsPerTransaction == 0) {
Expand All @@ -149,11 +149,19 @@ public void sweepBefore(final long markedBlockNumber) {
}
updater.commit();
// Sweep non-state-root nodes
prunedNodeCount += worldStateStorage.prune(markStorage::containsKey);
prunedNodeCount += worldStateStorage.prune(this::isMarked);
sweptNodesCounter.inc(prunedNodeCount);
worldStateStorage.removeNodeAddedListener(nodeAddedListenerId);
markStorage.clear();
LOG.info("Completed sweeping unused nodes");
LOG.debug("Completed sweeping unused nodes");
}

private boolean isMarked(final Bytes32 key) {
return pendingMarks.contains(key) || markStorage.containsKey(key.getArrayUnsafe());
}

private boolean isMarked(final byte[] key) {
return pendingMarks.contains(Bytes32.wrap(key)) || markStorage.containsKey(key);
}

private MerklePatriciaTrie<Bytes32, BytesValue> createStateTrie(final Bytes32 rootHash) {
Expand Down Expand Up @@ -182,10 +190,14 @@ private void processAccountState(final BytesValue value) {

@VisibleForTesting
void markNode(final Bytes32 hash) {
markedNodesCounter.inc();
markNodes(Collections.singleton(hash));
}

private void markNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.add(hash);
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
Expand All @@ -209,15 +221,4 @@ void flushPendingMarks() {
markLock.unlock();
}
}

private void markNewNodes(final Collection<Bytes32> nodeHashes) {
markedNodesCounter.inc(nodeHashes.size());
markLock.lock();
try {
pendingMarks.addAll(nodeHashes);
maybeFlushPendingMarks();
} finally {
markLock.unlock();
}
}
}
Loading