From 371f5e706ee043533cb5e537981a1d30ac58c2ab Mon Sep 17 00:00:00 2001 From: Karim TAAM Date: Mon, 9 Oct 2023 17:32:33 +0200 Subject: [PATCH 1/8] add retry logic for sync pipeline with rocksdb issue Signed-off-by: Karim TAAM --- .../eth/sync/StorageExceptionManager.java | 15 +++++ .../fastsync/worldstate/PersistDataStep.java | 49 +++++++++----- .../eth/sync/snapsync/LoadLocalDataStep.java | 39 +++++++---- .../eth/sync/snapsync/PersistDataStep.java | 67 +++++++++++-------- .../SnapWorldStateDownloadProcess.java | 16 +++++ .../ethereum/eth/sync/snapsync/StackTrie.java | 14 +++- .../request/AccountRangeDataRequest.java | 6 ++ .../snapsync/request/BytecodeRequest.java | 5 ++ .../request/StorageRangeDataRequest.java | 6 ++ .../request/heal/TrieNodeHealingRequest.java | 5 ++ 10 files changed, 162 insertions(+), 60 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java new file mode 100644 index 00000000000..9b47b50cb3b --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java @@ -0,0 +1,15 @@ +package org.hyperledger.besu.ethereum.eth.sync; + +import org.hyperledger.besu.plugin.services.exception.StorageException; + +public final class StorageExceptionManager { + + private static final String rocksdbClassName = "org.rocksdb.RocksDBException"; + private static final String ERR_BUSY = "Busy"; + private static final String ERR_LOCK_TIMED_OUT = "TimedOut(LockTimeout)"; + + public static boolean canRetryOnError(final StorageException e) { + return e.getMessage().contains(rocksdbClassName) + && (e.getMessage().contains(ERR_BUSY) || e.getMessage().contains(ERR_LOCK_TIMED_OUT)); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java index 94592b98d81..458242c1767 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java @@ -14,10 +14,13 @@ */ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; + import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater; +import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.services.tasks.Task; import java.util.List; @@ -33,24 +36,34 @@ public List> persist( final List> tasks, final BlockHeader blockHeader, final WorldDownloadState downloadState) { - final Updater updater = worldStateStorage.updater(); - tasks.stream() - .map( - task -> { - enqueueChildren(task, downloadState); - return task; - }) - .map(Task::getData) - .filter(request -> request.getData() != null) - .forEach( - request -> { - if (isRootState(blockHeader, request)) { - downloadState.setRootNodeData(request.getData()); - } else { - request.persist(updater); - } - }); - updater.commit(); + try { + final Updater updater = worldStateStorage.updater(); + tasks.stream() + .map( + task -> { + enqueueChildren(task, downloadState); + return task; + }) + .map(Task::getData) + .filter(request -> request.getData() != null) + .forEach( + request -> { + if (isRootState(blockHeader, request)) { + downloadState.setRootNodeData(request.getData()); + } else { + request.persist(updater); + } + }); + updater.commit(); + } catch (StorageException storageException) { + if (canRetryOnError(storageException)) { + // We reset the task by setting it to null. This way, it is considered as failed by the + // pipeline, and it will attempt to execute it again later. + tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null)); + } else { + throw storageException; + } + } return tasks; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java index 5264eac2b50..ee6c1175bce 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java @@ -14,11 +14,14 @@ */ package org.hyperledger.besu.ethereum.eth.sync.snapsync; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; + import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.metrics.Counter; import org.hyperledger.besu.services.pipeline.Pipe; import org.hyperledger.besu.services.tasks.Task; @@ -58,19 +61,29 @@ public Stream> loadLocalDataTrieNode( final Task task, final Pipe> completedTasks) { final TrieNodeHealingRequest request = (TrieNodeHealingRequest) task.getData(); // check if node is already stored in the worldstate - if (snapSyncState.hasPivotBlockHeader()) { - Optional existingData = request.getExistingData(downloadState, worldStateStorage); - if (existingData.isPresent()) { - existingNodeCounter.inc(); - request.setData(existingData.get()); - request.setRequiresPersisting(false); - final WorldStateStorage.Updater updater = worldStateStorage.updater(); - request.persist( - worldStateStorage, updater, downloadState, snapSyncState, snapSyncConfiguration); - updater.commit(); - downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage)); - completedTasks.put(task); - return Stream.empty(); + try { + if (snapSyncState.hasPivotBlockHeader()) { + Optional existingData = request.getExistingData(downloadState, worldStateStorage); + if (existingData.isPresent()) { + existingNodeCounter.inc(); + request.setData(existingData.get()); + request.setRequiresPersisting(false); + final WorldStateStorage.Updater updater = worldStateStorage.updater(); + request.persist( + worldStateStorage, updater, downloadState, snapSyncState, snapSyncConfiguration); + updater.commit(); + downloadState.enqueueRequests(request.getRootStorageRequests(worldStateStorage)); + completedTasks.put(task); + return Stream.empty(); + } + } + } catch (StorageException storageException) { + if (canRetryOnError(storageException)) { + // We reset the task by setting it to null. This way, it is considered as failed by the + // pipeline, and it will attempt to execute it again later. + task.getData().clear(); + } else { + throw storageException; } } return Stream.of(task); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java index 6a39f648716..157598a38c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java @@ -14,10 +14,13 @@ */ package org.hyperledger.besu.ethereum.eth.sync.snapsync; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; + import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; +import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.services.tasks.Task; import java.util.List; @@ -43,41 +46,51 @@ public PersistDataStep( } public List> persist(final List> tasks) { - final WorldStateStorage.Updater updater = worldStateStorage.updater(); - for (Task task : tasks) { - if (task.getData().isResponseReceived()) { - // enqueue child requests - final Stream childRequests = - task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState); - if (!(task.getData() instanceof TrieNodeHealingRequest)) { - enqueueChildren(childRequests); - } else { - if (!task.getData().isExpired(snapSyncState)) { + try { + final WorldStateStorage.Updater updater = worldStateStorage.updater(); + for (Task task : tasks) { + if (task.getData().isResponseReceived()) { + // enqueue child requests + final Stream childRequests = + task.getData().getChildRequests(downloadState, worldStateStorage, snapSyncState); + if (!(task.getData() instanceof TrieNodeHealingRequest)) { enqueueChildren(childRequests); } else { - continue; + if (!task.getData().isExpired(snapSyncState)) { + enqueueChildren(childRequests); + } else { + continue; + } } - } - // persist nodes - final int persistedNodes = - task.getData() - .persist( - worldStateStorage, - updater, - downloadState, - snapSyncState, - snapSyncConfiguration); - if (persistedNodes > 0) { - if (task.getData() instanceof TrieNodeHealingRequest) { - downloadState.getMetricsManager().notifyTrieNodesHealed(persistedNodes); - } else { - downloadState.getMetricsManager().notifyNodesGenerated(persistedNodes); + // persist nodes + final int persistedNodes = + task.getData() + .persist( + worldStateStorage, + updater, + downloadState, + snapSyncState, + snapSyncConfiguration); + if (persistedNodes > 0) { + if (task.getData() instanceof TrieNodeHealingRequest) { + downloadState.getMetricsManager().notifyTrieNodesHealed(persistedNodes); + } else { + downloadState.getMetricsManager().notifyNodesGenerated(persistedNodes); + } } } } + updater.commit(); + } catch (StorageException storageException) { + if (canRetryOnError(storageException)) { + // We reset the task by setting it to null. This way, it is considered as failed by the + // pipeline, and it will attempt to execute it again later. + tasks.forEach(task -> task.getData().clear()); + } else { + throw storageException; + } } - updater.commit(); return tasks; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java index c8afc582b72..c19ae6facc7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java @@ -231,6 +231,22 @@ public SnapWorldStateDownloadProcess build() { "step", "action"); + /* + The logic and intercommunication of different pipelines can be summarized as follows: + + 1. Account Data Pipeline (fetchAccountDataPipeline): This process starts with downloading the leaves of the account tree in ranges, with multiple ranges being processed simultaneously. + If the downloaded accounts are smart contracts, tasks are created in the storage pipeline to download the storage tree of the smart contract, and in the code download pipeline for the smart contract. + + 2. Storage Data Pipeline (fetchStorageDataPipeline): Running parallel to the account data pipeline, this pipeline downloads the storage of smart contracts. + If all slots cannot be downloaded at once, tasks are created in the fetchLargeStorageDataPipeline to download the storage by range, allowing parallelization of large account downloads. + + 3. Code Data Pipeline (fetchCodePipeline): This pipeline, running concurrently with the account and storage data pipelines, is responsible for downloading the code of the smart contracts. + + 4. Large Storage Data Pipeline (fetchLargeStorageDataPipeline): This pipeline is used when the storage data for a smart contract is too large to be downloaded at once. + It enables the storage data to be downloaded in ranges, similar to the account data. + + 5. Healing Phase: Initiated after all other pipelines have completed their tasks, this phase ensures the integrity and completeness of the downloaded data. + */ final Pipeline> completionPipeline = PipelineBuilder.>createPipeline( "requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java index fd6acde3754..1ab3ec3defb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java @@ -51,8 +51,8 @@ public class StackTrie { private final AtomicInteger nbSegments; private final int maxSegments; private final Bytes32 startKeyHash; - private final Map elements; - private final AtomicLong elementsCount; + private Map elements; + private AtomicLong elementsCount; public StackTrie(final Hash rootHash, final Bytes32 startKeyHash) { this(rootHash, 1, 1, startKeyHash); @@ -78,6 +78,11 @@ public void addElement( taskIdentifier, ImmutableTaskElement.builder().proofs(proofs).keys(keys).build()); } + public void removeElement(final Bytes32 taskIdentifier) { + ; + this.elementsCount.addAndGet(-this.elements.remove(taskIdentifier).keys().size()); + } + public TaskElement getElement(final Bytes32 taskIdentifier) { return this.elements.get(taskIdentifier); } @@ -142,6 +147,11 @@ public void maybeStoreNode(final Bytes location, final Node node) { } } + public void clear() { + this.elements = new LinkedHashMap<>(); + this.elementsCount = new AtomicLong(); + } + public boolean addSegment() { if (nbSegments.get() > maxSegments) { return false; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java index a1a6bc63da4..06181fd09f1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java @@ -216,6 +216,12 @@ public TreeMap getAccounts() { return stackTrie.getElement(startKeyHash).keys(); } + @Override + public void clear() { + stackTrie.clear(); + isProofValid = Optional.of(false); + } + public Bytes serialize() { return RLP.encode( out -> { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BytecodeRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BytecodeRequest.java index 96673d6f874..5db5ec0211e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BytecodeRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/BytecodeRequest.java @@ -88,6 +88,11 @@ public Bytes32 getAccountHash() { return accountHash; } + @Override + public void clear() { + setCode(Bytes.EMPTY); + } + public Bytes32 getCodeHash() { return codeHash; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java index c18d063d74d..14839f0ad6f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java @@ -205,6 +205,12 @@ public Bytes32 getEndKeyHash() { return endKeyHash; } + @Override + public void clear() { + this.isProofValid = Optional.of(false); + this.stackTrie.removeElement(startKeyHash); + } + @VisibleForTesting public void setProofValid(final boolean isProofValid) { this.isProofValid = Optional.of(isProofValid); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java index c04066141d8..ef7191a0167 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/TrieNodeHealingRequest.java @@ -123,6 +123,11 @@ public boolean isResponseReceived() { return !data.isEmpty() && Hash.hash(data).equals(getNodeHash()); } + @Override + public void clear() { + setData(Bytes.EMPTY); + } + @Override public boolean isExpired(final SnapSyncProcessState snapSyncState) { return snapSyncState.isExpired(this); From 5bd41c2ad514134aaab01e1056baa1b4e9b4b83b Mon Sep 17 00:00:00 2001 From: Karim TAAM Date: Mon, 9 Oct 2023 18:00:34 +0200 Subject: [PATCH 2/8] add logs Signed-off-by: Karim TAAM --- .../eth/sync/fastsync/worldstate/PersistDataStep.java | 7 +++++++ .../besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java | 4 ++++ .../besu/ethereum/eth/sync/snapsync/PersistDataStep.java | 5 +++++ 3 files changed, 16 insertions(+) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java index 458242c1767..79d13b15169 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java @@ -25,7 +25,13 @@ import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class PersistDataStep { + + private static final Logger LOG = LoggerFactory.getLogger(PersistDataStep.class); + private final WorldStateStorage worldStateStorage; public PersistDataStep(final WorldStateStorage worldStateStorage) { @@ -59,6 +65,7 @@ public List> persist( if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. + LOG.info("retry on rocksdb issue " + storageException.getMessage()); tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null)); } else { throw storageException; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java index ee6c1175bce..efa2902df9c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java @@ -30,9 +30,12 @@ import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LoadLocalDataStep { + private static final Logger LOG = LoggerFactory.getLogger(LoadLocalDataStep.class); private final WorldStateStorage worldStateStorage; private final SnapWorldDownloadState downloadState; private final SnapSyncProcessState snapSyncState; @@ -81,6 +84,7 @@ public Stream> loadLocalDataTrieNode( if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. + LOG.info("retry on rocksdb issue " + storageException.getMessage()); task.getData().clear(); } else { throw storageException; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java index 157598a38c3..71f95e96e05 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java @@ -26,7 +26,11 @@ import java.util.List; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class PersistDataStep { + private static final Logger LOG = LoggerFactory.getLogger(PersistDataStep.class); private final SnapSyncProcessState snapSyncState; private final WorldStateStorage worldStateStorage; @@ -86,6 +90,7 @@ public List> persist(final List> tas if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. + LOG.info("retry on rocksdb issue " + storageException.getMessage()); tasks.forEach(task -> task.getData().clear()); } else { throw storageException; From 7e7a3a7c6f9e661ebf942a7a70d5fd6cd2006264 Mon Sep 17 00:00:00 2001 From: Karim TAAM Date: Tue, 10 Oct 2023 09:36:35 +0200 Subject: [PATCH 3/8] fix issue during retry Signed-off-by: Karim TAAM --- .../eth/sync/StorageExceptionManager.java | 16 +++++++++++++++- .../ethereum/eth/sync/snapsync/StackTrie.java | 1 - .../request/AccountRangeDataRequest.java | 6 ++++-- .../request/StorageRangeDataRequest.java | 6 ++++-- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java index 9b47b50cb3b..23a490ba393 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java @@ -1,10 +1,24 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ package org.hyperledger.besu.ethereum.eth.sync; import org.hyperledger.besu.plugin.services.exception.StorageException; public final class StorageExceptionManager { - private static final String rocksdbClassName = "org.rocksdb.RocksDBException"; + private static final String rocksdbClassName = "RocksDBException"; private static final String ERR_BUSY = "Busy"; private static final String ERR_LOCK_TIMED_OUT = "TimedOut(LockTimeout)"; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java index 1ab3ec3defb..b6661036b7f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java @@ -79,7 +79,6 @@ public void addElement( } public void removeElement(final Bytes32 taskIdentifier) { - ; this.elementsCount.addAndGet(-this.elements.remove(taskIdentifier).keys().size()); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java index 06181fd09f1..6d34e9953ba 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java @@ -218,8 +218,10 @@ public TreeMap getAccounts() { @Override public void clear() { - stackTrie.clear(); - isProofValid = Optional.of(false); + if (isResponseReceived()) { + stackTrie.clear(); + isProofValid = Optional.of(false); + } } public Bytes serialize() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java index 14839f0ad6f..e972446edc0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java @@ -207,8 +207,10 @@ public Bytes32 getEndKeyHash() { @Override public void clear() { - this.isProofValid = Optional.of(false); - this.stackTrie.removeElement(startKeyHash); + if (isResponseReceived()) { + this.isProofValid = Optional.of(false); + this.stackTrie.removeElement(startKeyHash); + } } @VisibleForTesting From 2ff1aa67da02317f9d41720f02cc909a334e5a5b Mon Sep 17 00:00:00 2001 From: Karim TAAM Date: Tue, 10 Oct 2023 12:31:25 +0200 Subject: [PATCH 4/8] fix issue during retry Signed-off-by: Karim TAAM --- .../besu/ethereum/eth/sync/snapsync/StackTrie.java | 4 +++- .../eth/sync/snapsync/request/AccountRangeDataRequest.java | 6 ++---- .../eth/sync/snapsync/request/StorageRangeDataRequest.java | 6 ++---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java index b6661036b7f..01f17eb79b4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java @@ -79,7 +79,9 @@ public void addElement( } public void removeElement(final Bytes32 taskIdentifier) { - this.elementsCount.addAndGet(-this.elements.remove(taskIdentifier).keys().size()); + if (this.elements.containsKey(taskIdentifier)) { + this.elementsCount.addAndGet(-this.elements.remove(taskIdentifier).keys().size()); + } } public TaskElement getElement(final Bytes32 taskIdentifier) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java index 6d34e9953ba..06181fd09f1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java @@ -218,10 +218,8 @@ public TreeMap getAccounts() { @Override public void clear() { - if (isResponseReceived()) { - stackTrie.clear(); - isProofValid = Optional.of(false); - } + stackTrie.clear(); + isProofValid = Optional.of(false); } public Bytes serialize() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java index e972446edc0..14839f0ad6f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java @@ -207,10 +207,8 @@ public Bytes32 getEndKeyHash() { @Override public void clear() { - if (isResponseReceived()) { - this.isProofValid = Optional.of(false); - this.stackTrie.removeElement(startKeyHash); - } + this.isProofValid = Optional.of(false); + this.stackTrie.removeElement(startKeyHash); } @VisibleForTesting From 67cf31eaf2ee61417d004af5c1b427635828713e Mon Sep 17 00:00:00 2001 From: matkt Date: Mon, 16 Oct 2023 08:58:44 +0200 Subject: [PATCH 5/8] Update ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java Signed-off-by: garyschulte Co-authored-by: garyschulte Signed-off-by: matkt --- .../besu/ethereum/eth/sync/StorageExceptionManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java index 23a490ba393..51d69384edc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java @@ -19,8 +19,8 @@ public final class StorageExceptionManager { private static final String rocksdbClassName = "RocksDBException"; - private static final String ERR_BUSY = "Busy"; - private static final String ERR_LOCK_TIMED_OUT = "TimedOut(LockTimeout)"; + EnumSet RETRYABLE_STATUS_CODES = + EnumSet.of(Status.Code.TimedOut, Status.Code.TryAgain, Status.Code.Busy); public static boolean canRetryOnError(final StorageException e) { return e.getMessage().contains(rocksdbClassName) From d18cd854450c915fbbc99d06356244b9e83bef6d Mon Sep 17 00:00:00 2001 From: matkt Date: Mon, 16 Oct 2023 08:59:02 +0200 Subject: [PATCH 6/8] Update ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java Signed-off-by: garyschulte Co-authored-by: garyschulte Signed-off-by: matkt --- .../besu/ethereum/eth/sync/StorageExceptionManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java index 51d69384edc..08de3695bfb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java @@ -15,7 +15,8 @@ package org.hyperledger.besu.ethereum.eth.sync; import org.hyperledger.besu.plugin.services.exception.StorageException; - +import org.rocksdb.RocksDBException; +import org.rocksdb.Status; public final class StorageExceptionManager { private static final String rocksdbClassName = "RocksDBException"; From feff9ccd49feffd36aa7ba1bd501b96de4e54b81 Mon Sep 17 00:00:00 2001 From: Karim TAAM Date: Mon, 16 Oct 2023 09:38:12 +0200 Subject: [PATCH 7/8] refactor after review Signed-off-by: Karim TAAM --- ethereum/eth/build.gradle | 1 + .../eth/sync/StorageExceptionManager.java | 26 ++++++++++++++++--- .../fastsync/worldstate/PersistDataStep.java | 2 +- .../eth/sync/snapsync/LoadLocalDataStep.java | 2 +- .../eth/sync/snapsync/PersistDataStep.java | 2 +- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/ethereum/eth/build.gradle b/ethereum/eth/build.gradle index a40b49afffb..1919efdd4f2 100644 --- a/ethereum/eth/build.gradle +++ b/ethereum/eth/build.gradle @@ -58,6 +58,7 @@ dependencies { implementation 'io.tmio:tuweni-bytes' implementation 'io.tmio:tuweni-units' implementation 'io.tmio:tuweni-rlp' + implementation 'org.rocksdb:rocksdbjni' annotationProcessor "org.immutables:value" implementation "org.immutables:value-annotations" diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java index 08de3695bfb..97fb94d2919 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java @@ -15,16 +15,34 @@ package org.hyperledger.besu.ethereum.eth.sync; import org.hyperledger.besu.plugin.services.exception.StorageException; + +import java.util.EnumSet; +import java.util.Optional; + import org.rocksdb.RocksDBException; import org.rocksdb.Status; + public final class StorageExceptionManager { - private static final String rocksdbClassName = "RocksDBException"; - EnumSet RETRYABLE_STATUS_CODES = + private static final EnumSet RETRYABLE_STATUS_CODES = EnumSet.of(Status.Code.TimedOut, Status.Code.TryAgain, Status.Code.Busy); + /** + * Determines if an operation can be retried based on the error received. This method checks if + * the cause of the StorageException is a RocksDBException. If it is, it retrieves the status code + * of the RocksDBException and checks if it is contained in the list of retryable {@link + * StorageExceptionManager.RETRYABLE_STATUS_CODES} status codes. + * + * @param e the StorageException to check + * @return true if the operation can be retried, false otherwise + */ public static boolean canRetryOnError(final StorageException e) { - return e.getMessage().contains(rocksdbClassName) - && (e.getMessage().contains(ERR_BUSY) || e.getMessage().contains(ERR_LOCK_TIMED_OUT)); + return Optional.of(e.getCause()) + .filter(z -> z instanceof RocksDBException) + .map(RocksDBException.class::cast) + .map(RocksDBException::getStatus) + .map(Status::getCode) + .map(RETRYABLE_STATUS_CODES::contains) + .orElse(false); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java index 79d13b15169..5ddd279a12b 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java @@ -65,7 +65,7 @@ public List> persist( if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. - LOG.info("retry on rocksdb issue " + storageException.getMessage()); + LOG.info("Retry on rocksdb issue " + storageException.getMessage()); tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null)); } else { throw storageException; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java index efa2902df9c..4e5e12756f2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java @@ -84,7 +84,7 @@ public Stream> loadLocalDataTrieNode( if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. - LOG.info("retry on rocksdb issue " + storageException.getMessage()); + LOG.info("Retry on rocksdb issue " + storageException.getMessage()); task.getData().clear(); } else { throw storageException; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java index 71f95e96e05..604c7dc80b4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java @@ -90,7 +90,7 @@ public List> persist(final List> tas if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. - LOG.info("retry on rocksdb issue " + storageException.getMessage()); + LOG.info("Retry on rocksdb issue " + storageException.getMessage()); tasks.forEach(task -> task.getData().clear()); } else { throw storageException; From f1616aaf301837eef3599b1518dd4abadf861d5c Mon Sep 17 00:00:00 2001 From: Karim TAAM Date: Thu, 19 Oct 2023 16:48:51 +0200 Subject: [PATCH 8/8] not display all retryable issues Signed-off-by: Karim TAAM --- .../eth/sync/StorageExceptionManager.java | 16 ++++++++++++++++ .../fastsync/worldstate/PersistDataStep.java | 9 ++++++++- .../eth/sync/snapsync/LoadLocalDataStep.java | 9 ++++++++- .../eth/sync/snapsync/PersistDataStep.java | 12 ++++++++++-- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java index 97fb94d2919..97d1506cf3e 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/StorageExceptionManager.java @@ -27,6 +27,9 @@ public final class StorageExceptionManager { private static final EnumSet RETRYABLE_STATUS_CODES = EnumSet.of(Status.Code.TimedOut, Status.Code.TryAgain, Status.Code.Busy); + private static final long ERROR_THRESHOLD = 1000; + + private static long retryableErrorCounter; /** * Determines if an operation can be retried based on the error received. This method checks if * the cause of the StorageException is a RocksDBException. If it is, it retrieves the status code @@ -43,6 +46,19 @@ public static boolean canRetryOnError(final StorageException e) { .map(RocksDBException::getStatus) .map(Status::getCode) .map(RETRYABLE_STATUS_CODES::contains) + .map( + result -> { + retryableErrorCounter++; + return result; + }) .orElse(false); } + + public static long getRetryableErrorCounter() { + return retryableErrorCounter; + } + + public static boolean errorCountAtThreshold() { + return retryableErrorCounter % ERROR_THRESHOLD == 1; + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java index 5ddd279a12b..1ab202ee6ce 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/worldstate/PersistDataStep.java @@ -15,6 +15,8 @@ package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.errorCountAtThreshold; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.getRetryableErrorCounter; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; @@ -65,7 +67,12 @@ public List> persist( if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. - LOG.info("Retry on rocksdb issue " + storageException.getMessage()); + if (errorCountAtThreshold()) { + LOG.info( + "Encountered {} retryable RocksDB errors, latest error message {}", + getRetryableErrorCounter(), + storageException.getMessage()); + } tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null)); } else { throw storageException; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java index 4e5e12756f2..c24dbf6037d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/LoadLocalDataStep.java @@ -15,6 +15,8 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync; import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.errorCountAtThreshold; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.getRetryableErrorCounter; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.heal.TrieNodeHealingRequest; @@ -84,7 +86,12 @@ public Stream> loadLocalDataTrieNode( if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the // pipeline, and it will attempt to execute it again later. - LOG.info("Retry on rocksdb issue " + storageException.getMessage()); + if (errorCountAtThreshold()) { + LOG.info( + "Encountered {} retryable RocksDB errors, latest error message {}", + getRetryableErrorCounter(), + storageException.getMessage()); + } task.getData().clear(); } else { throw storageException; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java index 604c7dc80b4..df3696ccdf7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/PersistDataStep.java @@ -15,6 +15,8 @@ package org.hyperledger.besu.ethereum.eth.sync.snapsync; import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.errorCountAtThreshold; +import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.getRetryableErrorCounter; import org.hyperledger.besu.ethereum.bonsai.storage.BonsaiWorldStateKeyValueStorage; import org.hyperledger.besu.ethereum.eth.sync.snapsync.request.SnapDataRequest; @@ -89,8 +91,14 @@ public List> persist(final List> tas } catch (StorageException storageException) { if (canRetryOnError(storageException)) { // We reset the task by setting it to null. This way, it is considered as failed by the - // pipeline, and it will attempt to execute it again later. - LOG.info("Retry on rocksdb issue " + storageException.getMessage()); + // pipeline, and it will attempt to execute it again later. not display all the retryable + // issues + if (errorCountAtThreshold()) { + LOG.info( + "Encountered {} retryable RocksDB errors, latest error message {}", + getRetryableErrorCounter(), + storageException.getMessage()); + } tasks.forEach(task -> task.getData().clear()); } else { throw storageException;