diff --git a/build.gradle b/build.gradle index fe8b7877756..18003d96f0d 100644 --- a/build.gradle +++ b/build.gradle @@ -164,9 +164,7 @@ allprojects { } // Below this line are currently only license header tasks - format 'groovy', { - target '**/src/*/grovy/**/*.groovy' - } + format 'groovy', { target '**/src/*/grovy/**/*.groovy' } // Currently disabled due to referencetest issues // format 'bash', { diff --git a/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java new file mode 100644 index 00000000000..4008ef9b667 --- /dev/null +++ b/ethereum/core/src/integration-test/java/org/hyperledger/besu/ethereum/worldstate/PrunerIntegrationTest.java @@ -0,0 +1,255 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.worldstate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.core.InMemoryStorageProvider.createInMemoryBlockchain; + +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.MutableWorldState; +import org.hyperledger.besu.ethereum.core.TransactionReceipt; +import org.hyperledger.besu.ethereum.core.WorldState; +import org.hyperledger.besu.ethereum.rlp.RLP; +import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.storage.keyvalue.WorldStatePreimageKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.MerklePatriciaTrie; +import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; +import org.hyperledger.besu.testutil.MockExecutorService; +import org.hyperledger.besu.util.bytes.Bytes32; +import org.hyperledger.besu.util.bytes.BytesValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.junit.Test; + +public class PrunerIntegrationTest { + + private final BlockDataGenerator gen = new BlockDataGenerator(); + private final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem(); + private final Map hashValueStore = new HashMap<>(); + private final InMemoryKeyValueStorage stateStorage = new TestInMemoryStorage(hashValueStore); + private final WorldStateStorage worldStateStorage = new WorldStateKeyValueStorage(stateStorage); + private final WorldStateArchive worldStateArchive = + new WorldStateArchive( + worldStateStorage, new WorldStatePreimageKeyValueStorage(new InMemoryKeyValueStorage())); + private final InMemoryKeyValueStorage markStorage = new InMemoryKeyValueStorage(); + private final Block genesisBlock = gen.genesisBlock(); + private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock); + + @Test + public void pruner_smallState_manyOpsPerTx() throws InterruptedException { + testPruner(3, 1, 1, 4, 1000); + } + + @Test + public void pruner_largeState_fewOpsPerTx() throws InterruptedException { + testPruner(2, 5, 5, 6, 5); + } + + @Test + public void pruner_emptyBlocks() throws InterruptedException { + testPruner(5, 0, 2, 5, 10); + } + + @Test + public void pruner_markChainhead() throws InterruptedException { + testPruner(4, 2, 1, 10, 20); + } + + @Test + public void pruner_lowRelativeBlockConfirmations() throws InterruptedException { + testPruner(3, 2, 1, 4, 20); + } + + @Test + public void pruner_highRelativeBlockConfirmations() throws InterruptedException { + testPruner(3, 2, 9, 10, 20); + } + + private void testPruner( + final int numCycles, + final int accountsPerBlock, + final long blockConfirmations, + final int numBlocksToKeep, + final int opsPerTransaction) + throws InterruptedException { + + final var markSweepPruner = + new MarkSweepPruner( + worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction); + final var pruner = + new Pruner( + markSweepPruner, + blockchain, + new MockExecutorService(), + new PruningConfiguration(blockConfirmations, numBlocksToKeep)); + + pruner.start(); + + for (int cycle = 0; cycle < numCycles; ++cycle) { + int numBlockInCycle = + numBlocksToKeep + + 1; // +1 to get it to switch from MARKING_COMPLETE TO SWEEPING on each cycle + var fullyMarkedBlockNum = cycle * numBlockInCycle + 1; + + // This should cause a full mark and sweep cycle + assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); + generateBlockchainData(numBlockInCycle, accountsPerBlock); + assertThat(pruner.getState()).isEqualByComparingTo(Pruner.State.IDLE); + + // Collect the nodes we expect to keep + final Set expectedNodes = new HashSet<>(); + for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) { + final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); + collectWorldStateNodes(stateRoot, expectedNodes); + } + + if (accountsPerBlock != 0) { + assertThat(hashValueStore.size()) + .isGreaterThanOrEqualTo(expectedNodes.size()); // Sanity check + } + + // Assert that blocks from mark point onward are still accessible + for (int i = fullyMarkedBlockNum; i <= blockchain.getChainHeadBlockNumber(); i++) { + final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); + assertThat(worldStateArchive.get(stateRoot)).isPresent(); + final WorldState markedState = worldStateArchive.get(stateRoot).get(); + // Traverse accounts and make sure all are accessible + final int expectedAccounts = accountsPerBlock * i; + final long accounts = + markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count(); + assertThat(accounts).isEqualTo(expectedAccounts); + // Traverse storage to ensure that all storage is accessible + markedState + .streamAccounts(Bytes32.ZERO, expectedAccounts * 2) + .forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000)); + } + + // All other state roots should have been removed + for (int i = 0; i < fullyMarkedBlockNum; i++) { + final BlockHeader curHeader = blockchain.getBlockHeader(i).get(); + if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) { + assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty(); + } + } + + // Check that storage contains only the values we expect + assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size()); + assertThat(hashValueStore.values()) + .containsExactlyInAnyOrderElementsOf( + expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet())); + } + + pruner.stop(); + } + + private void generateBlockchainData(final int numBlocks, final int numAccounts) { + Block parentBlock = blockchain.getChainHeadBlock(); + for (int i = 0; i < numBlocks; i++) { + final MutableWorldState worldState = + worldStateArchive.getMutable(parentBlock.getHeader().getStateRoot()).get(); + gen.createRandomContractAccountsWithNonEmptyStorage(worldState, numAccounts); + final Hash stateRoot = worldState.rootHash(); + + final Block block = + gen.block( + BlockOptions.create() + .setStateRoot(stateRoot) + .setBlockNumber(parentBlock.getHeader().getNumber() + 1L) + .setParentHash(parentBlock.getHash())); + final List receipts = gen.receipts(block); + blockchain.appendBlock(block, receipts); + parentBlock = block; + } + } + + private Set collectWorldStateNodes( + final Hash stateRootHash, final Set collector) { + final List storageRoots = new ArrayList<>(); + final MerklePatriciaTrie stateTrie = createStateTrie(stateRootHash); + + // Collect storage roots and code + stateTrie + .entriesFrom(Bytes32.ZERO, 1000) + .forEach( + (key, val) -> { + final StateTrieAccountValue accountValue = + StateTrieAccountValue.readFrom(RLP.input(val)); + stateStorage + .get(accountValue.getCodeHash().getArrayUnsafe()) + .ifPresent(v -> collector.add(BytesValue.wrap(v))); + storageRoots.add(accountValue.getStorageRoot()); + }); + + // Collect state nodes + collectTrieNodes(stateTrie, collector); + // Collect storage nodes + for (Hash storageRoot : storageRoots) { + final MerklePatriciaTrie storageTrie = createStorageTrie(storageRoot); + collectTrieNodes(storageTrie, collector); + } + + return collector; + } + + private void collectTrieNodes( + final MerklePatriciaTrie trie, final Set collector) { + final Bytes32 rootHash = trie.getRootHash(); + trie.visitAll( + (node) -> { + if (node.isReferencedByHash() || node.getHash().equals(rootHash)) { + collector.add(node.getRlp()); + } + }); + } + + private MerklePatriciaTrie createStateTrie(final Bytes32 rootHash) { + return new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStateTrieNode, + rootHash, + Function.identity(), + Function.identity()); + } + + private MerklePatriciaTrie createStorageTrie(final Bytes32 rootHash) { + return new StoredMerklePatriciaTrie<>( + worldStateStorage::getAccountStorageTrieNode, + rootHash, + Function.identity(), + Function.identity()); + } + + // Proxy class so that we have access to the constructor that takes our own map + private static class TestInMemoryStorage extends InMemoryKeyValueStorage { + + public TestInMemoryStorage(final Map hashValueStore) { + super(hashValueStore); + } + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java index efa3b38a0eb..951e265de7d 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPruner.java @@ -54,7 +54,7 @@ public class MarkSweepPruner { private final Counter sweptNodesCounter; private volatile long nodeAddedListenerId; private final ReentrantLock markLock = new ReentrantLock(true); - private final Set pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set pendingMarks = Collections.newSetFromMap(new ConcurrentHashMap<>()); public MarkSweepPruner( final WorldStateStorage worldStateStorage, @@ -98,7 +98,9 @@ public MarkSweepPruner( public void prepare() { worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); // Just in case. - nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNewNodes); + markStorage.clear(); + pendingMarks.clear(); + nodeAddedListenerId = worldStateStorage.addNodeAddedListener(this::markNodes); } public void cleanup() { @@ -107,7 +109,6 @@ public void cleanup() { public void mark(final Hash rootHash) { markOperationCounter.inc(); - markStorage.clear(); createStateTrie(rootHash) .visitAll( node -> { @@ -119,13 +120,12 @@ public void mark(final Hash rootHash) { markNode(node.getHash()); node.getValue().ifPresent(this::processAccountState); }); - LOG.info("Completed marking used nodes for pruning"); + LOG.debug("Completed marking used nodes for pruning"); } public void sweepBefore(final long markedBlockNumber) { - flushPendingMarks(); sweepOperationCounter.inc(); - LOG.info("Sweeping unused nodes"); + LOG.debug("Sweeping unused nodes"); // Sweep state roots first, walking backwards until we get to a state root that isn't in the // storage long prunedNodeCount = 0; @@ -138,7 +138,7 @@ public void sweepBefore(final long markedBlockNumber) { break; } - if (!markStorage.containsKey(candidateStateRootHash.getArrayUnsafe())) { + if (!isMarked(candidateStateRootHash)) { updater.removeAccountStateTrieNode(candidateStateRootHash); prunedNodeCount++; if (prunedNodeCount % operationsPerTransaction == 0) { @@ -149,11 +149,19 @@ public void sweepBefore(final long markedBlockNumber) { } updater.commit(); // Sweep non-state-root nodes - prunedNodeCount += worldStateStorage.prune(markStorage::containsKey); + prunedNodeCount += worldStateStorage.prune(this::isMarked); sweptNodesCounter.inc(prunedNodeCount); worldStateStorage.removeNodeAddedListener(nodeAddedListenerId); markStorage.clear(); - LOG.info("Completed sweeping unused nodes"); + LOG.debug("Completed sweeping unused nodes"); + } + + private boolean isMarked(final Bytes32 key) { + return pendingMarks.contains(key) || markStorage.containsKey(key.getArrayUnsafe()); + } + + private boolean isMarked(final byte[] key) { + return pendingMarks.contains(Bytes32.wrap(key)) || markStorage.containsKey(key); } private MerklePatriciaTrie createStateTrie(final Bytes32 rootHash) { @@ -182,10 +190,14 @@ private void processAccountState(final BytesValue value) { @VisibleForTesting void markNode(final Bytes32 hash) { - markedNodesCounter.inc(); + markNodes(Collections.singleton(hash)); + } + + private void markNodes(final Collection nodeHashes) { + markedNodesCounter.inc(nodeHashes.size()); markLock.lock(); try { - pendingMarks.add(hash); + pendingMarks.addAll(nodeHashes); maybeFlushPendingMarks(); } finally { markLock.unlock(); @@ -209,15 +221,4 @@ void flushPendingMarks() { markLock.unlock(); } } - - private void markNewNodes(final Collection nodeHashes) { - markedNodesCounter.inc(nodeHashes.size()); - markLock.lock(); - try { - pendingMarks.addAll(nodeHashes); - maybeFlushPendingMarks(); - } finally { - markLock.unlock(); - } - } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java index b76a7978ebf..fb8c7e5116f 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/Pruner.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.ethereum.worldstate; +import static com.google.common.base.Preconditions.checkArgument; + import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -23,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,15 +51,13 @@ public Pruner( this.blockchain = blockchain; this.blocksRetained = pruningConfiguration.getBlocksRetained(); this.blockConfirmations = pruningConfiguration.getBlockConfirmations(); - if (blockConfirmations < 0 || blocksRetained < 0) { - throw new IllegalArgumentException( - String.format( - "blockConfirmations and blocksRetained must be non-negative. blockConfirmations=%d, blocksRetained=%d", - blockConfirmations, blocksRetained)); - } + checkArgument( + blockConfirmations >= 0 && blockConfirmations < blocksRetained, + "blockConfirmations and blocksRetained must be non-negative. blockConfirmations must be less than blockRetained."); } public void start() { + LOG.info("Starting Pruner."); blockchain.observeBlockAdded((event, blockchain) -> handleNewBlock(event)); } @@ -88,7 +89,7 @@ private void handleNewBlock(final BlockAddedEvent event) { private void mark(final BlockHeader header) { markBlockNumber = header.getNumber(); final Hash stateRoot = header.getStateRoot(); - LOG.info( + LOG.debug( "Begin marking used nodes for pruning. Block number: {} State root: {}", markBlockNumber, stateRoot); @@ -100,7 +101,10 @@ private void mark(final BlockHeader header) { } private void sweep() { - LOG.info("Begin sweeping unused nodes for pruning. Retention period: {}", blocksRetained); + LOG.debug( + "Begin sweeping unused nodes for pruning. Keeping full state for blocks {} to {}", + markBlockNumber, + markBlockNumber + blocksRetained); execute( () -> { pruningStrategy.sweepBefore(markBlockNumber); @@ -117,7 +121,12 @@ private void execute(final Runnable action) { } } - private enum State { + @VisibleForTesting + State getState() { + return state.get(); + } + + enum State { IDLE, MARK_BLOCK_CONFIRMATIONS_AWAITING, MARKING, diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java index efa0c089b98..e2fcddab72a 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/MarkSweepPrunerTest.java @@ -65,105 +65,6 @@ public class MarkSweepPrunerTest { private final Block genesisBlock = gen.genesisBlock(); private final MutableBlockchain blockchain = createInMemoryBlockchain(genesisBlock); - @Test - public void prepareMarkAndSweep_smallState_manyOpsPerTx() { - testPrepareMarkAndSweep(3, 1, 2, 1000); - } - - @Test - public void prepareMarkAndSweep_largeState_fewOpsPerTx() { - testPrepareMarkAndSweep(20, 5, 5, 5); - } - - @Test - public void prepareMarkAndSweep_emptyBlocks() { - testPrepareMarkAndSweep(10, 0, 5, 10); - } - - @Test - public void prepareMarkAndSweep_markChainhead() { - testPrepareMarkAndSweep(10, 2, 10, 20); - } - - @Test - public void prepareMarkAndSweep_markGenesis() { - testPrepareMarkAndSweep(10, 2, 0, 20); - } - - @Test - public void prepareMarkAndSweep_multipleRounds() { - testPrepareMarkAndSweep(10, 2, 10, 20); - testPrepareMarkAndSweep(10, 2, 15, 20); - } - - private void testPrepareMarkAndSweep( - final int numBlocks, - final int accountsPerBlock, - final int markBlockNumber, - final int opsPerTransaction) { - final MarkSweepPruner pruner = - new MarkSweepPruner( - worldStateStorage, blockchain, markStorage, metricsSystem, opsPerTransaction); - final int chainHeight = (int) blockchain.getChainHead().getHeight(); - // Generate blocks up to markBlockNumber - final int blockCountBeforeMarkedBlock = markBlockNumber - chainHeight; - generateBlockchainData(blockCountBeforeMarkedBlock, accountsPerBlock); - - // Prepare - pruner.prepare(); - // Mark - final BlockHeader markBlock = blockchain.getBlockHeader(markBlockNumber).get(); - pruner.mark(markBlock.getStateRoot()); - - // Generate more blocks that should be kept - generateBlockchainData(numBlocks - blockCountBeforeMarkedBlock, accountsPerBlock); - - // Collect the nodes we expect to keep - final Set expectedNodes = collectWorldStateNodes(markBlock.getStateRoot()); - for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) { - final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); - collectWorldStateNodes(stateRoot, expectedNodes); - } - if (accountsPerBlock != 0 && markBlockNumber > 0) { - assertThat(hashValueStore.size()).isGreaterThan(expectedNodes.size()); // Sanity check - } - - // Sweep - pruner.sweepBefore(markBlock.getNumber()); - - // Assert that blocks from mark point onward are still accessible - for (int i = markBlockNumber; i <= blockchain.getChainHeadBlockNumber(); i++) { - final Hash stateRoot = blockchain.getBlockHeader(i).get().getStateRoot(); - assertThat(worldStateArchive.get(stateRoot)).isPresent(); - final WorldState markedState = worldStateArchive.get(stateRoot).get(); - // Traverse accounts and make sure all are accessible - final int expectedAccounts = accountsPerBlock * i; - final long accounts = markedState.streamAccounts(Bytes32.ZERO, expectedAccounts * 2).count(); - assertThat(accounts).isEqualTo(expectedAccounts); - // Traverse storage to ensure that all storage is accessible - markedState - .streamAccounts(Bytes32.ZERO, expectedAccounts * 2) - .forEach(a -> a.storageEntriesFrom(Bytes32.ZERO, 1000)); - } - - // All other state roots should have been removed - for (int i = 0; i < markBlockNumber; i++) { - final BlockHeader curHeader = blockchain.getBlockHeader(i + 1L).get(); - if (curHeader.getNumber() == markBlock.getNumber()) { - continue; - } - if (!curHeader.getStateRoot().equals(Hash.EMPTY_TRIE_HASH)) { - assertThat(worldStateArchive.get(curHeader.getStateRoot())).isEmpty(); - } - } - - // Check that storage contains only the values we expect - assertThat(hashValueStore.size()).isEqualTo(expectedNodes.size()); - assertThat(hashValueStore.values()) - .containsExactlyInAnyOrderElementsOf( - expectedNodes.stream().map(BytesValue::getArrayUnsafe).collect(Collectors.toSet())); - } - @Test public void mark_marksAllExpectedNodes() { final MarkSweepPruner pruner = @@ -362,6 +263,7 @@ private MerklePatriciaTrie createStorageTrie(final Bytes32 Function.identity()); } + // Proxy class so that we have access to the constructor that takes our own map private static class TestInMemoryStorage extends InMemoryKeyValueStorage { public TestInMemoryStorage(final Map hashValueStore) { diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java index 1b13cdbcdb2..b60fd543481 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/worldstate/PrunerTest.java @@ -65,7 +65,7 @@ public void shouldMarkCorrectBlockAndSweep() throws InterruptedException { final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0)); + markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); pruner.start(); final Block block1 = appendBlockWithParent(blockchain, genesisBlock); @@ -159,6 +159,22 @@ public void shouldRejectInvalidArguments() { mockExecutorService, new PruningConfiguration(-1, -2))) .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + new Pruner( + markSweepPruner, + mockchain, + mockExecutorService, + new PruningConfiguration(10, 8))) + .isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy( + () -> + new Pruner( + markSweepPruner, + mockchain, + mockExecutorService, + new PruningConfiguration(10, 10))) + .isInstanceOf(IllegalArgumentException.class); } @Test @@ -171,7 +187,7 @@ public void shouldCleanUpPruningStrategyOnShutdown() throws InterruptedException final Pruner pruner = new Pruner( - markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 0)); + markSweepPruner, blockchain, mockExecutorService, new PruningConfiguration(0, 1)); pruner.start(); pruner.stop(); verify(markSweepPruner).cleanup(); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java index 7b8efdc7aba..59ae98c626c 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java @@ -77,9 +77,15 @@ public Optional get(final byte[] key) throws StorageException { @Override public long removeAllKeysUnless(final Predicate retainCondition) throws StorageException { - long initialSize = hashValueStore.keySet().size(); - hashValueStore.keySet().removeIf(key -> !retainCondition.test(key.getArrayUnsafe())); - return initialSize - hashValueStore.keySet().size(); + final Lock lock = rwLock.writeLock(); + lock.lock(); + try { + long initialSize = hashValueStore.keySet().size(); + hashValueStore.keySet().removeIf(key -> !retainCondition.test(key.getArrayUnsafe())); + return initialSize - hashValueStore.keySet().size(); + } finally { + lock.unlock(); + } } @Override