diff --git a/CHANGELOG.md b/CHANGELOG.md index 5742ccc9378..ba1e2a46e23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - Increase the speed of modexp gas execution and execution. [#4780](https://github.com/hyperledger/besu/pull/4780) - Added experimental CLI options `--Xeth-capability-max` and `--Xeth-capability-min` to specify a range of capabilities to be supported by the Eth protocol. [#4752](https://github.com/hyperledger/besu/pull/4752) - Set the default curve in the EVMTool, like is done in production operations [#4790](https://github.com/hyperledger/besu/pull/4790) +- Add chain data pruning feature with three experimental CLI options: `--Xchain-pruning-enabled`, `--Xchain-pruning-blocks-retained` and `--Xchain-pruning-frequency` [#4686](https://github.com/hyperledger/besu/pull/4686) ### Bug Fixes - Fix storage key format for eth_getProof so that it follows the EIP-1474 spec [#4564](https://github.com/hyperledger/besu/pull/4564) diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index 3f79bc4279a..0521e147eab 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -56,6 +56,7 @@ import org.hyperledger.besu.cli.options.stable.LoggingLevelOption; import org.hyperledger.besu.cli.options.stable.NodePrivateKeyFileOption; import org.hyperledger.besu.cli.options.stable.P2PTLSConfigOptions; +import org.hyperledger.besu.cli.options.unstable.ChainPruningOptions; import org.hyperledger.besu.cli.options.unstable.DnsOptions; import org.hyperledger.besu.cli.options.unstable.EthProtocolOptions; import org.hyperledger.besu.cli.options.unstable.EvmOptions; @@ -289,6 +290,7 @@ public class BesuCommand implements DefaultCommandValues, Runnable { private final PrivacyPluginOptions unstablePrivacyPluginOptions = PrivacyPluginOptions.create(); private final EvmOptions unstableEvmOptions = EvmOptions.create(); private final IpcOptions unstableIpcOptions = IpcOptions.create(); + private final ChainPruningOptions unstableChainPruningOptions = ChainPruningOptions.create(); // stable CLI options private final DataStorageOptions dataStorageOptions = DataStorageOptions.create(); @@ -1539,6 +1541,7 @@ private void handleUnstableOptions() { .put("Launcher", unstableLauncherOptions) .put("EVM Options", unstableEvmOptions) .put("IPC Options", unstableIpcOptions) + .put("Chain Data Pruning Options", unstableChainPruningOptions) .build(); UnstableOptionsSubCommand.createUnstableOptions(commandLine, unstableOptions); @@ -1776,6 +1779,7 @@ private void validateOptions() { validateDnsOptionsParams(); ensureValidPeerBoundParams(); validateRpcOptionsParams(); + validateChainDataPruningParams(); p2pTLSConfigOptions.checkP2PTLSOptionsDependencies(logger, commandLine); pkiBlockCreationOptions.checkPkiBlockCreationOptionsDependencies(logger, commandLine); } @@ -1927,6 +1931,17 @@ public void validateRpcOptionsParams() { } } + public void validateChainDataPruningParams() { + if (unstableChainPruningOptions.getChainDataPruningEnabled() + && unstableChainPruningOptions.getChainDataPruningBlocksRetained() + < ChainPruningOptions.DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED) { + throw new ParameterException( + this.commandLine, + "--Xchain-pruning-blocks-retained must be >= " + + ChainPruningOptions.DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED); + } + } + private GenesisConfigOptions readGenesisConfigOptions() { try { @@ -2184,7 +2199,8 @@ public BesuControllerBuilder getControllerBuilder() { .reorgLoggingThreshold(reorgLoggingThreshold) .evmConfiguration(unstableEvmOptions.toDomainObject()) .dataStorageConfiguration(dataStorageOptions.toDomainObject()) - .maxPeers(p2PDiscoveryOptionGroup.maxPeers); + .maxPeers(p2PDiscoveryOptionGroup.maxPeers) + .chainPruningConfiguration(unstableChainPruningOptions.toDomainObject()); } private GraphQLConfiguration graphQLConfiguration() { diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/ChainPruningOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/ChainPruningOptions.java new file mode 100644 index 00000000000..5b90e9281d6 --- /dev/null +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/ChainPruningOptions.java @@ -0,0 +1,90 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ +package org.hyperledger.besu.cli.options.unstable; + +import org.hyperledger.besu.cli.options.CLIOptions; +import org.hyperledger.besu.ethereum.chain.ChainPrunerConfiguration; +import org.hyperledger.besu.util.number.PositiveNumber; + +import java.util.Arrays; +import java.util.List; + +import picocli.CommandLine; + +public class ChainPruningOptions implements CLIOptions { + private static final String CHAIN_PRUNING_ENABLED_FLAG = "--Xchain-pruning-enabled"; + private static final String CHAIN_PRUNING_BLOCKS_RETAINED_FLAG = + "--Xchain-pruning-blocks-retained"; + private static final String CHAIN_PRUNING_FREQUENCY_FLAG = "--Xchain-pruning-frequency"; + public static final long DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED = 7200; + public static final int DEFAULT_CHAIN_DATA_PRUNING_FREQUENCY = 256; + + @CommandLine.Option( + hidden = true, + names = {CHAIN_PRUNING_ENABLED_FLAG}, + description = + "Enable the chain pruner to actively prune old chain data (default: ${DEFAULT-VALUE})") + private final Boolean chainDataPruningEnabled = Boolean.FALSE; + + @CommandLine.Option( + hidden = true, + names = {CHAIN_PRUNING_BLOCKS_RETAINED_FLAG}, + description = + "The number of recent blocks for which to keep the chain data. Must be >= " + + DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED + + " (default: ${DEFAULT-VALUE})") + private final Long chainDataPruningBlocksRetained = + DEFAULT_CHAIN_DATA_PRUNING_MIN_BLOCKS_RETAINED; + + @CommandLine.Option( + hidden = true, + names = {CHAIN_PRUNING_FREQUENCY_FLAG}, + description = + "The number of blocks added to the chain between two pruning operations. Must be non-negative (default: ${DEFAULT-VALUE})") + private final PositiveNumber chainDataPruningBlocksFrequency = + PositiveNumber.fromInt(DEFAULT_CHAIN_DATA_PRUNING_FREQUENCY); + + public static ChainPruningOptions create() { + return new ChainPruningOptions(); + } + + public Boolean getChainDataPruningEnabled() { + return chainDataPruningEnabled; + } + + public Long getChainDataPruningBlocksRetained() { + return chainDataPruningBlocksRetained; + } + + @Override + public ChainPrunerConfiguration toDomainObject() { + return new ChainPrunerConfiguration( + chainDataPruningEnabled, + chainDataPruningBlocksRetained, + chainDataPruningBlocksFrequency.getValue()); + } + + @Override + public List getCLIOptions() { + return Arrays.asList( + CHAIN_PRUNING_ENABLED_FLAG, + chainDataPruningEnabled.toString(), + CHAIN_PRUNING_BLOCKS_RETAINED_FLAG, + chainDataPruningBlocksRetained.toString(), + CHAIN_PRUNING_FREQUENCY_FLAG, + chainDataPruningBlocksFrequency.toString()); + } +} diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 2482591f9d6..115fb4b5fa6 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -35,6 +35,9 @@ import org.hyperledger.besu.ethereum.bonsai.CachedMerkleTrieLoader; import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.chain.BlockchainStorage; +import org.hyperledger.besu.ethereum.chain.ChainDataPruner; +import org.hyperledger.besu.ethereum.chain.ChainDataPrunerStorage; +import org.hyperledger.besu.ethereum.chain.ChainPrunerConfiguration; import org.hyperledger.besu.ethereum.chain.DefaultBlockchain; import org.hyperledger.besu.ethereum.chain.GenesisState; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; @@ -51,6 +54,7 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthScheduler; import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter; +import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors; import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator; @@ -140,6 +144,7 @@ public abstract class BesuControllerBuilder implements MiningParameterOverrides Collections.emptyList(); protected EvmConfiguration evmConfiguration; protected int maxPeers; + protected ChainPrunerConfiguration chainPrunerConfiguration = ChainPrunerConfiguration.DEFAULT; public BesuControllerBuilder storageProvider(final StorageProvider storageProvider) { this.storageProvider = storageProvider; @@ -268,6 +273,12 @@ public BesuControllerBuilder maxPeers(final int maxPeers) { return this; } + public BesuControllerBuilder chainPruningConfiguration( + final ChainPrunerConfiguration chainPrunerConfiguration) { + this.chainPrunerConfiguration = chainPrunerConfiguration; + return this; + } + public BesuController build() { checkNotNull(genesisConfig, "Missing genesis config"); checkNotNull(syncConfig, "Missing sync config"); @@ -315,6 +326,22 @@ public BesuController build() { blockchain, worldStateArchive, protocolSchedule, this::createConsensusContext); validateContext(protocolContext); + if (chainPrunerConfiguration.getChainPruningEnabled()) { + protocolContext + .safeConsensusContext(MergeContext.class) + .ifPresent( + mergeContext -> { + mergeContext.setIsChainPruningEnabled(true); + }); + final ChainDataPruner chainDataPruner = createChainPruner(blockchainStorage); + blockchain.observeBlockAdded(chainDataPruner); + LOG.info( + "Chain data pruning enabled with recent blocks retained to be: " + + chainPrunerConfiguration.getChainPruningBlocksRetained() + + " and frequency to be: " + + chainPrunerConfiguration.getChainPruningBlocksFrequency()); + } + protocolSchedule.setPublicWorldStateArchiveForPrivacyBlockProcessor( protocolContext.getWorldStateArchive()); @@ -651,6 +678,22 @@ private WorldStateArchive createWorldStateArchive( } } + private ChainDataPruner createChainPruner(final BlockchainStorage blockchainStorage) { + return new ChainDataPruner( + blockchainStorage, + new ChainDataPrunerStorage( + storageProvider.getStorageBySegmentIdentifier( + KeyValueSegmentIdentifier.CHAIN_PRUNER_STATE)), + chainPrunerConfiguration.getChainPruningBlocksRetained(), + chainPrunerConfiguration.getChainPruningBlocksFrequency(), + MonitoredExecutors.newBoundedThreadPool( + ChainDataPruner.class.getSimpleName(), + 1, + 1, + ChainDataPruner.MAX_PRUNING_THREAD_QUEUE_SIZE, + metricsSystem)); + } + protected List createPeerValidators(final ProtocolSchedule protocolSchedule) { final List validators = new ArrayList<>(); diff --git a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java index af7f58000b7..d415f99cea8 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/CommandTestAbstract.java @@ -226,7 +226,7 @@ public void initMocks() throws Exception { when(mockControllerBuilder.dataStorageConfiguration(any())).thenReturn(mockControllerBuilder); when(mockControllerBuilder.evmConfiguration(any())).thenReturn(mockControllerBuilder); when(mockControllerBuilder.maxPeers(anyInt())).thenReturn(mockControllerBuilder); - + when(mockControllerBuilder.chainPruningConfiguration(any())).thenReturn(mockControllerBuilder); // doReturn used because of generic BesuController doReturn(mockController).when(mockControllerBuilder).build(); lenient().when(mockController.getProtocolManager()).thenReturn(mockEthProtocolManager); diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java index eb06c53fe9d..c1c217788fb 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/MergeContext.java @@ -65,4 +65,10 @@ void fireNewUnverifiedForkchoiceEvent( void putPayloadById(final PayloadIdentifier payloadId, final Block block); Optional retrieveBlockById(final PayloadIdentifier payloadId); + + default void setIsChainPruningEnabled(final boolean isChainPruningEnabled) {} + + default boolean isChainPruningEnabled() { + return false; + } } diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java index 2ce00d83cb2..c1f87fe846f 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/PostMergeContext.java @@ -64,6 +64,10 @@ public class PostMergeContext implements MergeContext { private final AtomicReference> terminalPoWBlock = new AtomicReference<>(Optional.empty()); + // TODO: cleanup - isChainPruningEnabled will not be required after + // https://github.com/hyperledger/besu/pull/4703 is merged. + private boolean isChainPruningEnabled = false; + @VisibleForTesting PostMergeContext() { this(Difficulty.ZERO); @@ -265,4 +269,14 @@ private static class PayloadTuple { this.block = block; } } + + @Override + public void setIsChainPruningEnabled(final boolean isChainPruningEnabled) { + this.isChainPruningEnabled = isChainPruningEnabled; + } + + @Override + public boolean isChainPruningEnabled() { + return isChainPruningEnabled; + } } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java index 64c427da1e8..787c3509071 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineForkchoiceUpdated.java @@ -119,7 +119,8 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext) } // TODO: post-merge cleanup, this should be unnecessary after merge - if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newHead)) { + if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newHead) + && !mergeContext.get().isChainPruningEnabled()) { logForkchoiceUpdatedCall(INVALID, forkChoice); return new JsonRpcSuccessResponse( requestId, diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineNewPayload.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineNewPayload.java index 3b495b10135..ff3c8eab554 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineNewPayload.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/engine/EngineNewPayload.java @@ -196,7 +196,8 @@ public JsonRpcResponse syncResponse(final JsonRpcRequestContext requestContext) } // TODO: post-merge cleanup - if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newBlockHeader)) { + if (!mergeCoordinator.latestValidAncestorDescendsFromTerminal(newBlockHeader) + && !mergeContext.get().isChainPruningEnabled()) { mergeCoordinator.addBadBlock(block, Optional.empty()); return respondWithInvalid( reqId, diff --git a/ethereum/core/build.gradle b/ethereum/core/build.gradle index cab2d0769ff..e835c828d66 100644 --- a/ethereum/core/build.gradle +++ b/ethereum/core/build.gradle @@ -81,6 +81,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter' testImplementation 'org.junit.jupiter:junit-jupiter-params' testImplementation 'org.mockito:mockito-core' + testImplementation 'org.awaitility:awaitility' testRuntimeOnly 'org.junit.vintage:junit-vintage-engine' diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockchainStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockchainStorage.java index 4f4aa87a23c..c610a3e0d2b 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockchainStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/BlockchainStorage.java @@ -72,8 +72,16 @@ interface Updater { void removeBlockHash(long blockNumber); + void removeBlockHeader(final Hash blockHash); + + void removeBlockBody(final Hash blockHash); + + void removeTransactionReceipts(final Hash blockHash); + void removeTransactionLocation(Hash transactionHash); + void removeTotalDifficulty(final Hash blockHash); + void commit(); void rollback(); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPruner.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPruner.java new file mode 100644 index 00000000000..7807786948d --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPruner.java @@ -0,0 +1,111 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ +package org.hyperledger.besu.ethereum.chain; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; + +import java.util.Collection; +import java.util.concurrent.ExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ChainDataPruner implements BlockAddedObserver { + public static final int MAX_PRUNING_THREAD_QUEUE_SIZE = 16; + private static final Logger LOG = LoggerFactory.getLogger(ChainDataPruner.class); + private final BlockchainStorage blockchainStorage; + private final ChainDataPrunerStorage prunerStorage; + private final long blocksToRetain; + private final long pruningFrequency; + private final ExecutorService pruningExecutor; + + public ChainDataPruner( + final BlockchainStorage blockchainStorage, + final ChainDataPrunerStorage prunerStorage, + final long blocksToRetain, + final long pruningFrequency, + final ExecutorService pruningExecutor) { + this.blockchainStorage = blockchainStorage; + this.prunerStorage = prunerStorage; + this.blocksToRetain = blocksToRetain; + this.pruningFrequency = pruningFrequency; + this.pruningExecutor = pruningExecutor; + } + + @Override + public void onBlockAdded(final BlockAddedEvent event) { + final long blockNumber = event.getBlock().getHeader().getNumber(); + final long storedPruningMark = prunerStorage.getPruningMark().orElse(blockNumber); + if (blockNumber < storedPruningMark) { + LOG.warn( + "Block added event: " + + event + + " has a block number of " + + blockNumber + + " < pruning mark " + + storedPruningMark + + " which normally indicates chain-pruning-blocks-retained is too small"); + return; + } + final KeyValueStorageTransaction recordBlockHashesTransaction = + prunerStorage.startTransaction(); + final Collection forkBlocks = prunerStorage.getForkBlocks(blockNumber); + forkBlocks.add(event.getBlock().getHash()); + prunerStorage.setForkBlocks(recordBlockHashesTransaction, blockNumber, forkBlocks); + recordBlockHashesTransaction.commit(); + + pruningExecutor.submit( + () -> { + final KeyValueStorageTransaction pruningTransaction = prunerStorage.startTransaction(); + long currentPruningMark = storedPruningMark; + final long newPruningMark = blockNumber - blocksToRetain; + final long blocksToBePruned = newPruningMark - currentPruningMark; + if (event.isNewCanonicalHead() && blocksToBePruned >= pruningFrequency) { + long currentRetainedBlock = blockNumber - currentPruningMark + 1; + while (currentRetainedBlock > blocksToRetain) { + LOG.debug("Pruning chain data with block height of " + currentPruningMark); + pruneChainDataAtBlock(pruningTransaction, currentPruningMark); + currentPruningMark++; + currentRetainedBlock = blockNumber - currentPruningMark; + } + } + prunerStorage.setPruningMark(pruningTransaction, currentPruningMark); + pruningTransaction.commit(); + }); + } + + private void pruneChainDataAtBlock(final KeyValueStorageTransaction tx, final long blockNumber) { + final Collection oldForkBlocks = prunerStorage.getForkBlocks(blockNumber); + final BlockchainStorage.Updater updater = blockchainStorage.updater(); + for (final Hash toPrune : oldForkBlocks) { + updater.removeBlockHeader(toPrune); + updater.removeBlockBody(toPrune); + updater.removeTransactionReceipts(toPrune); + updater.removeTotalDifficulty(toPrune); + blockchainStorage + .getBlockBody(toPrune) + .ifPresent( + blockBody -> + blockBody + .getTransactions() + .forEach(t -> updater.removeTransactionLocation(t.getHash()))); + } + updater.removeBlockHash(blockNumber); + updater.commit(); + prunerStorage.removeForkBlocks(tx, blockNumber); + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerStorage.java new file mode 100644 index 00000000000..8b50ec83d52 --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerStorage.java @@ -0,0 +1,99 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ +package org.hyperledger.besu.ethereum.chain; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.rlp.RLP; +import org.hyperledger.besu.plugin.services.storage.KeyValueStorage; +import org.hyperledger.besu.plugin.services.storage.KeyValueStorageTransaction; + +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Optional; + +import com.google.common.collect.Lists; +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; +import org.apache.tuweni.units.bigints.UInt256; + +public class ChainDataPrunerStorage { + private static final Bytes PRUNING_MARK_KEY = + Bytes.wrap("pruningMark".getBytes(StandardCharsets.UTF_8)); + + private static final Bytes VARIABLES_PREFIX = Bytes.of(1); + private static final Bytes FORK_BLOCKS_PREFIX = Bytes.of(2); + + private final KeyValueStorage storage; + + public ChainDataPrunerStorage(final KeyValueStorage storage) { + this.storage = storage; + } + + public KeyValueStorageTransaction startTransaction() { + return this.storage.startTransaction(); + } + + public Optional getPruningMark() { + return get(VARIABLES_PREFIX, PRUNING_MARK_KEY).map(UInt256::fromBytes).map(UInt256::toLong); + } + + public Collection getForkBlocks(final long blockNumber) { + return get(FORK_BLOCKS_PREFIX, UInt256.valueOf(blockNumber)) + .map(bytes -> RLP.input(bytes).readList(in -> bytesToHash(in.readBytes32()))) + .orElse(Lists.newArrayList()); + } + + public void setPruningMark(final KeyValueStorageTransaction transaction, final long pruningMark) { + set(transaction, VARIABLES_PREFIX, PRUNING_MARK_KEY, UInt256.valueOf(pruningMark)); + } + + public void setForkBlocks( + final KeyValueStorageTransaction transaction, + final long blockNumber, + final Collection forkBlocks) { + set( + transaction, + FORK_BLOCKS_PREFIX, + UInt256.valueOf(blockNumber), + RLP.encode(o -> o.writeList(forkBlocks, (val, out) -> out.writeBytes(val)))); + } + + public void removeForkBlocks( + final KeyValueStorageTransaction transaction, final long blockNumber) { + remove(transaction, FORK_BLOCKS_PREFIX, UInt256.valueOf(blockNumber)); + } + + private Optional get(final Bytes prefix, final Bytes key) { + return storage.get(Bytes.concatenate(prefix, key).toArrayUnsafe()).map(Bytes::wrap); + } + + private void set( + final KeyValueStorageTransaction transaction, + final Bytes prefix, + final Bytes key, + final Bytes value) { + transaction.put(Bytes.concatenate(prefix, key).toArrayUnsafe(), value.toArrayUnsafe()); + } + + private void remove( + final KeyValueStorageTransaction transaction, final Bytes prefix, final Bytes key) { + transaction.remove(Bytes.concatenate(prefix, key).toArrayUnsafe()); + } + + private Hash bytesToHash(final Bytes bytes) { + return Hash.wrap(Bytes32.wrap(bytes, 0)); + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainPrunerConfiguration.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainPrunerConfiguration.java new file mode 100644 index 00000000000..99ac9bc486d --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/ChainPrunerConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ +package org.hyperledger.besu.ethereum.chain; + +public class ChainPrunerConfiguration { + public static final ChainPrunerConfiguration DEFAULT = + new ChainPrunerConfiguration(false, 7200, 256); + private final boolean enabled; + private final long blocksRetained; + private final long blocksFrequency; + + public ChainPrunerConfiguration( + final boolean enabled, final long blocksRetained, final long blocksFrequency) { + this.enabled = enabled; + this.blocksRetained = blocksRetained; + this.blocksFrequency = blocksFrequency; + } + + public long getChainPruningBlocksRetained() { + return blocksRetained; + } + + public boolean getChainPruningEnabled() { + return enabled; + } + + public long getChainPruningBlocksFrequency() { + return blocksFrequency; + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java index 316316fcc7c..87a3c0f89a8 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java @@ -35,7 +35,8 @@ public enum KeyValueSegmentIdentifier implements SegmentIdentifier { BACKWARD_SYNC_BLOCKS(new byte[] {14}), BACKWARD_SYNC_CHAIN(new byte[] {15}), SNAPSYNC_MISSING_ACCOUNT_RANGE(new byte[] {16}), - SNAPSYNC_ACCOUNT_TO_FIX(new byte[] {17}); + SNAPSYNC_ACCOUNT_TO_FIX(new byte[] {17}), + CHAIN_PRUNER_STATE(new byte[] {18}); private final byte[] id; private final int[] versionList; diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueStoragePrefixedKeyBlockchainStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueStoragePrefixedKeyBlockchainStorage.java index f638bb4d906..8e26ede5f60 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueStoragePrefixedKeyBlockchainStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueStoragePrefixedKeyBlockchainStorage.java @@ -203,11 +203,31 @@ public void removeBlockHash(final long blockNumber) { remove(BLOCK_HASH_PREFIX, UInt256.valueOf(blockNumber)); } + @Override + public void removeBlockHeader(final Hash blockHash) { + remove(BLOCK_HEADER_PREFIX, blockHash); + } + + @Override + public void removeBlockBody(final Hash blockHash) { + remove(BLOCK_BODY_PREFIX, blockHash); + } + + @Override + public void removeTransactionReceipts(final Hash blockHash) { + remove(TRANSACTION_RECEIPTS_PREFIX, blockHash); + } + @Override public void removeTransactionLocation(final Hash transactionHash) { remove(TRANSACTION_LOCATION_PREFIX, transactionHash); } + @Override + public void removeTotalDifficulty(final Hash blockHash) { + remove(TOTAL_DIFFICULTY_PREFIX, blockHash); + } + @Override public void commit() { transaction.commit(); diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerTest.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerTest.java new file mode 100644 index 00000000000..93e4005cb9b --- /dev/null +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/chain/ChainDataPrunerTest.java @@ -0,0 +1,140 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ +package org.hyperledger.besu.ethereum.chain; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.chain.ChainDataPruner.MAX_PRUNING_THREAD_QUEUE_SIZE; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; +import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; +import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +public class ChainDataPrunerTest { + + @Test + public void singleChainPruning() { + final BlockDataGenerator gen = new BlockDataGenerator(); + final BlockchainStorage blockchainStorage = + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); + final ChainDataPruner chainDataPruner = + new ChainDataPruner( + blockchainStorage, + new ChainDataPrunerStorage(new InMemoryKeyValueStorage()), + 512, + 0, + new ThreadPoolExecutor( + 1, + 1, + 60L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(MAX_PRUNING_THREAD_QUEUE_SIZE), + new ThreadPoolExecutor.DiscardPolicy())); + Block genesisBlock = gen.genesisBlock(); + final MutableBlockchain blockchain = + DefaultBlockchain.createMutable( + genesisBlock, blockchainStorage, new NoOpMetricsSystem(), 0); + blockchain.observeBlockAdded(chainDataPruner); + + // Generate & Import 1000 blocks + gen.blockSequence(genesisBlock, 1000) + .forEach( + blk -> { + blockchain.appendBlock(blk, gen.receipts(blk)); + long number = blk.getHeader().getNumber(); + if (number <= 512) { + // No prune happened + assertThat(blockchain.getBlockHeader(1)).isPresent(); + } else { + // Prune number - 512 only + Awaitility.await() + .pollInterval(1, TimeUnit.MILLISECONDS) + .atMost(50, TimeUnit.MILLISECONDS) + .until(() -> blockchain.getBlockHeader(number - 512).isEmpty()); + assertThat(blockchain.getBlockHeader(number - 511)).isPresent(); + } + }); + } + + @Test + public void forkPruning() { + final BlockDataGenerator gen = new BlockDataGenerator(); + final BlockchainStorage blockchainStorage = + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()); + final ChainDataPruner chainDataPruner = + new ChainDataPruner( + blockchainStorage, + new ChainDataPrunerStorage(new InMemoryKeyValueStorage()), + 512, + 0, + new ThreadPoolExecutor( + 1, + 1, + 60L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(MAX_PRUNING_THREAD_QUEUE_SIZE), + new ThreadPoolExecutor.DiscardPolicy())); + Block genesisBlock = gen.genesisBlock(); + final MutableBlockchain blockchain = + DefaultBlockchain.createMutable( + genesisBlock, blockchainStorage, new NoOpMetricsSystem(), 0); + blockchain.observeBlockAdded(chainDataPruner); + + List canonicalChain = gen.blockSequence(genesisBlock, 1000); + List forkChain = gen.blockSequence(genesisBlock, 16); + for (Block blk : forkChain) { + blockchain.storeBlock(blk, gen.receipts(blk)); + } + for (int i = 0; i < 512; i++) { + Block blk = canonicalChain.get(i); + blockchain.appendBlock(blk, gen.receipts(blk)); + } + // No prune happened + assertThat(blockchain.getBlockByHash(canonicalChain.get(0).getHash())).isPresent(); + assertThat(blockchain.getBlockByHash(forkChain.get(0).getHash())).isPresent(); + for (int i = 512; i < 527; i++) { + final int index = i; + Block blk = canonicalChain.get(i); + blockchain.appendBlock(blk, gen.receipts(blk)); + // Prune block on canonical chain and fork for i - 512 only + Awaitility.await() + .pollInterval(1, TimeUnit.MILLISECONDS) + .atMost(50, TimeUnit.MILLISECONDS) + .until( + () -> blockchain.getBlockByHash(canonicalChain.get(index - 512).getHash()).isEmpty()); + assertThat(blockchain.getBlockByHash(canonicalChain.get(i - 511).getHash())).isPresent(); + Awaitility.await() + .pollInterval(1, TimeUnit.MILLISECONDS) + .atMost(50, TimeUnit.MILLISECONDS) + .until( + () -> blockchain.getBlockByHash(canonicalChain.get(index - 512).getHash()).isEmpty()); + + assertThat(blockchain.getBlockByHash(forkChain.get(i - 511).getHash())).isPresent(); + } + } +}