-
Notifications
You must be signed in to change notification settings - Fork 1.1k
add retry logic on sync pipeline for rocksdb issue #6004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
371f5e7
5bd41c2
7e7a3a7
2ff1aa6
9296b27
67cf31e
d18cd85
feff9cc
7a58c5c
4dc9bf7
f1616aa
d532c9e
5044839
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,29 @@ | ||||||||||||||||||||||
| /* | ||||||||||||||||||||||
| * Copyright ConsenSys AG. | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with | ||||||||||||||||||||||
| * the License. You may obtain a copy of the License at | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on | ||||||||||||||||||||||
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the | ||||||||||||||||||||||
| * specific language governing permissions and limitations under the License. | ||||||||||||||||||||||
| * | ||||||||||||||||||||||
| * SPDX-License-Identifier: Apache-2.0 | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
| package org.hyperledger.besu.ethereum.eth.sync; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| import org.hyperledger.besu.plugin.services.exception.StorageException; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public final class StorageExceptionManager { | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| private static final String rocksdbClassName = "RocksDBException"; | ||||||||||||||||||||||
| private static final String ERR_BUSY = "Busy"; | ||||||||||||||||||||||
| private static final String ERR_LOCK_TIMED_OUT = "TimedOut(LockTimeout)"; | ||||||||||||||||||||||
|
matkt marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||
|
|
||||||||||||||||||||||
| public static boolean canRetryOnError(final StorageException e) { | ||||||||||||||||||||||
| return e.getMessage().contains(rocksdbClassName) | ||||||||||||||||||||||
| && (e.getMessage().contains(ERR_BUSY) || e.getMessage().contains(ERR_LOCK_TIMED_OUT)); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. less brittle to use enum values
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be good, will update |
||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,15 +14,24 @@ | |
| */ | ||
| package org.hyperledger.besu.ethereum.eth.sync.fastsync.worldstate; | ||
|
|
||
| import static org.hyperledger.besu.ethereum.eth.sync.StorageExceptionManager.canRetryOnError; | ||
|
|
||
| import org.hyperledger.besu.ethereum.core.BlockHeader; | ||
| import org.hyperledger.besu.ethereum.eth.sync.worldstate.WorldDownloadState; | ||
| import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage; | ||
| import org.hyperledger.besu.ethereum.worldstate.WorldStateStorage.Updater; | ||
| import org.hyperledger.besu.plugin.services.exception.StorageException; | ||
| import org.hyperledger.besu.services.tasks.Task; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class PersistDataStep { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(PersistDataStep.class); | ||
|
|
||
| private final WorldStateStorage worldStateStorage; | ||
|
|
||
| public PersistDataStep(final WorldStateStorage worldStateStorage) { | ||
|
|
@@ -33,24 +42,35 @@ public List<Task<NodeDataRequest>> persist( | |
| final List<Task<NodeDataRequest>> tasks, | ||
| final BlockHeader blockHeader, | ||
| final WorldDownloadState<NodeDataRequest> downloadState) { | ||
| final Updater updater = worldStateStorage.updater(); | ||
| tasks.stream() | ||
| .map( | ||
| task -> { | ||
| enqueueChildren(task, downloadState); | ||
| return task; | ||
| }) | ||
| .map(Task::getData) | ||
| .filter(request -> request.getData() != null) | ||
| .forEach( | ||
| request -> { | ||
| if (isRootState(blockHeader, request)) { | ||
| downloadState.setRootNodeData(request.getData()); | ||
| } else { | ||
| request.persist(updater); | ||
| } | ||
| }); | ||
| updater.commit(); | ||
| try { | ||
| final Updater updater = worldStateStorage.updater(); | ||
| tasks.stream() | ||
| .map( | ||
| task -> { | ||
| enqueueChildren(task, downloadState); | ||
| return task; | ||
| }) | ||
| .map(Task::getData) | ||
| .filter(request -> request.getData() != null) | ||
| .forEach( | ||
| request -> { | ||
| if (isRootState(blockHeader, request)) { | ||
| downloadState.setRootNodeData(request.getData()); | ||
| } else { | ||
| request.persist(updater); | ||
| } | ||
| }); | ||
| updater.commit(); | ||
| } catch (StorageException storageException) { | ||
| if (canRetryOnError(storageException)) { | ||
| // We reset the task by setting it to null. This way, it is considered as failed by the | ||
| // pipeline, and it will attempt to execute it again later. | ||
| LOG.info("retry on rocksdb issue " + storageException.getMessage()); | ||
| tasks.forEach(nodeDataRequestTask -> nodeDataRequestTask.getData().setData(null)); | ||
| } else { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we not fail the future for an individual task in the foreach rather than failing the whole list? or is that a lot more plumbing to do in the pipeline to handle a single failed task future?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not possible because all modification are in the same updater and we cannot commit only a part of a transaction |
||
| throw storageException; | ||
| } | ||
| } | ||
| return tasks; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -231,6 +231,22 @@ public SnapWorldStateDownloadProcess build() { | |
| "step", | ||
| "action"); | ||
|
|
||
| /* | ||
| The logic and intercommunication of different pipelines can be summarized as follows: | ||
|
|
||
| 1. Account Data Pipeline (fetchAccountDataPipeline): This process starts with downloading the leaves of the account tree in ranges, with multiple ranges being processed simultaneously. | ||
| If the downloaded accounts are smart contracts, tasks are created in the storage pipeline to download the storage tree of the smart contract, and in the code download pipeline for the smart contract. | ||
|
|
||
| 2. Storage Data Pipeline (fetchStorageDataPipeline): Running parallel to the account data pipeline, this pipeline downloads the storage of smart contracts. | ||
| If all slots cannot be downloaded at once, tasks are created in the fetchLargeStorageDataPipeline to download the storage by range, allowing parallelization of large account downloads. | ||
|
|
||
| 3. Code Data Pipeline (fetchCodePipeline): This pipeline, running concurrently with the account and storage data pipelines, is responsible for downloading the code of the smart contracts. | ||
|
|
||
| 4. Large Storage Data Pipeline (fetchLargeStorageDataPipeline): This pipeline is used when the storage data for a smart contract is too large to be downloaded at once. | ||
| It enables the storage data to be downloaded in ranges, similar to the account data. | ||
|
|
||
| 5. Healing Phase: Initiated after all other pipelines have completed their tasks, this phase ensures the integrity and completeness of the downloaded data. | ||
| */ | ||
|
Comment on lines
+234
to
+249
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙏 |
||
| final Pipeline<Task<SnapDataRequest>> completionPipeline = | ||
| PipelineBuilder.<Task<SnapDataRequest>>createPipeline( | ||
| "requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.