Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb;

import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat;
import org.hyperledger.besu.plugin.services.BesuConfiguration;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.exception.StorageException;
Expand All @@ -28,7 +27,6 @@
import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.OptimisticRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.TransactionDBRocksDBColumnarKeyValueStorage;
import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter;

import java.io.IOException;
Expand Down Expand Up @@ -166,8 +164,10 @@ public SegmentedKeyValueStorage create(
final BesuConfiguration commonConfiguration,
final MetricsSystem metricsSystem)
throws StorageException {
final boolean isForestStorageFormat =
DataStorageFormat.FOREST.getDatabaseVersion() == commonConfiguration.getDatabaseVersion();
// TODO: removed for testing/discovery
// final boolean isForestStorageFormat =
// DataStorageFormat.FOREST.getDatabaseVersion() ==
// commonConfiguration.getDatabaseVersion();
if (requiresInit()) {
init(commonConfiguration);
}
Expand All @@ -192,25 +192,26 @@ public SegmentedKeyValueStorage create(
configuredSegments.stream()
.filter(segmentId -> segmentId.includeInDatabaseVersion(databaseVersion))
.collect(Collectors.toList());
if (isForestStorageFormat) {
LOG.debug("FOREST mode detected, using TransactionDB.");
segmentedStorage =
new TransactionDBRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
} else {
LOG.debug("Using OptimisticTransactionDB.");
segmentedStorage =
new OptimisticRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
}
// TODO: removing TransactionDB for testing/discovery only:
// if (isForestStorageFormat) {
// LOG.debug("FOREST mode detected, using TransactionDB.");
// segmentedStorage =
// new TransactionDBRocksDBColumnarKeyValueStorage(
// rocksDBConfiguration,
// segmentsForVersion,
// ignorableSegments,
// metricsSystem,
// rocksDBMetricsFactory);
// } else {
LOG.debug("Using OptimisticTransactionDB.");
segmentedStorage =
new OptimisticRocksDBColumnarKeyValueStorage(
rocksDBConfiguration,
segmentsForVersion,
ignorableSegments,
metricsSystem,
rocksDBMetricsFactory);
// }
}
return segmentedStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb;

import static org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBTransaction.RetryableRocksDBAction.maybeRetryRocksDBAction;

import org.hyperledger.besu.plugin.services.exception.StorageException;
import org.hyperledger.besu.plugin.services.metrics.OperationTimer;
import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier;
import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction;

import java.util.EnumSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
Expand All @@ -31,7 +38,8 @@
/** The RocksDb transaction. */
public class RocksDBTransaction implements SegmentedKeyValueStorageTransaction {
private static final Logger logger = LoggerFactory.getLogger(RocksDBTransaction.class);
private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device";
private static final String ERR_NO_SPACE_LEFT_ON_DEVICE = "No space left on device";
private static final int DEFAULT_MAX_RETRIES = 5;

private final RocksDBMetrics metrics;
private final Transaction innerTx;
Expand Down Expand Up @@ -62,11 +70,11 @@ public void put(final SegmentIdentifier segmentId, final byte[] key, final byte[
try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) {
innerTx.put(columnFamilyMapper.apply(segmentId), key, value);
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(e.getMessage());
System.exit(0);
}
throw new StorageException(e);
maybeRetryRocksDBAction(
e,
0,
DEFAULT_MAX_RETRIES,
() -> innerTx.put(columnFamilyMapper.apply(segmentId), key, value));
}
}

Expand All @@ -75,11 +83,11 @@ public void remove(final SegmentIdentifier segmentId, final byte[] key) {
try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) {
innerTx.delete(columnFamilyMapper.apply(segmentId), key);
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(e.getMessage());
System.exit(0);
}
throw new StorageException(e);
maybeRetryRocksDBAction(
e,
0,
DEFAULT_MAX_RETRIES,
() -> innerTx.delete(columnFamilyMapper.apply(segmentId), key));
}
}

Expand All @@ -88,11 +96,7 @@ public void commit() throws StorageException {
try (final OperationTimer.TimingContext ignored = metrics.getCommitLatency().startTimer()) {
innerTx.commit();
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(e.getMessage());
System.exit(0);
}
throw new StorageException(e);
maybeRetryRocksDBAction(e, 0, DEFAULT_MAX_RETRIES, innerTx::commit);
} finally {
close();
}
Expand All @@ -102,14 +106,10 @@ public void commit() throws StorageException {
public void rollback() {
try {
innerTx.rollback();
metrics.getRollbackCount().inc();
} catch (final RocksDBException e) {
if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(e.getMessage());
System.exit(0);
}
throw new StorageException(e);
maybeRetryRocksDBAction(e, 0, DEFAULT_MAX_RETRIES, innerTx::rollback);
} finally {
metrics.getRollbackCount().inc();
close();
}
}
Expand All @@ -118,4 +118,71 @@ private void close() {
innerTx.close();
options.close();
}

@FunctionalInterface
interface RetryableRocksDBAction {
void retry() throws RocksDBException;

EnumSet<Status.Code> RETRYABLE_STATUS_CODES =
EnumSet.of(Status.Code.TimedOut, Status.Code.TryAgain, Status.Code.Busy);

static void maybeRetryRocksDBAction(
final RocksDBException ex,
final int attemptNumber,
final int retryLimit,
final RetryableRocksDBAction retryAction) {

if (ex.getMessage().contains(ERR_NO_SPACE_LEFT_ON_DEVICE)) {
logger.error(ex.getMessage());
System.exit(0);
}
if (attemptNumber <= retryLimit) {
if (RETRYABLE_STATUS_CODES.contains(ex.getStatus().getCode())) {
logger.warn(
"RocksDB Transient exception caught on attempt {} of {}, status: {}, retrying.",
attemptNumber,
retryLimit,
ex.getStatus().getCodeString());
try {
retryBackoff();
retryAction.retry();
} catch (RocksDBException ex2) {
maybeRetryRocksDBAction(ex2, attemptNumber + 1, retryLimit, retryAction);
}
}
} else {
throw new StorageException(ex);
}
}

long BASE_TIMEOUT = 1000; // Base timeout in milliseconds
long MAX_TIMEOUT = 30000; // Max timeout in milliseconds
long DECAY_TIME = 5000; // Time in milliseconds after which the timeout decays
AtomicLong timeout = new AtomicLong(BASE_TIMEOUT);
AtomicLong lastCallTime = new AtomicLong(System.currentTimeMillis());

@VisibleForTesting
static void resetTimeout(final long timeoutVal) {
timeout.set(timeoutVal);
}

static void retryBackoff() {
try {
long currentTime = System.currentTimeMillis();
long delay = timeout.get();
// If no retries for DECAY_TIME milliseconds, decay the timeout towards base value
long callDiff = currentTime - lastCallTime.get();
if (callDiff > DECAY_TIME) {
delay = Math.max(BASE_TIMEOUT, delay - callDiff / 2);
timeout.set(delay);
}
TimeUnit.MILLISECONDS.sleep(delay);
// Increase the timeout for the next call, up to the maximum
timeout.updateAndGet(t -> Math.min(t * 2, MAX_TIMEOUT));
lastCallTime.set(currentTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Preserve interrupt status
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteOptions;

/** TransactionDB RocksDB Columnar key value storage */
Expand Down Expand Up @@ -85,10 +86,15 @@ RocksDB getDB() {
public SegmentedKeyValueStorageTransaction startTransaction() throws StorageException {
throwIfClosed();
final WriteOptions writeOptions = new WriteOptions();
final TransactionOptions transactionOptions =
new TransactionOptions().setLockTimeout(5000).setDeadlockDetect(true);
writeOptions.setIgnoreMissingColumnFamilies(true);
return new SegmentedKeyValueStorageTransactionValidatorDecorator(
new RocksDBTransaction(
this::safeColumnHandle, db.beginTransaction(writeOptions), writeOptions, metrics),
this::safeColumnHandle,
db.beginTransaction(writeOptions, transactionOptions),
writeOptions,
metrics),
this.closed::get);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Hyperledger Besu Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.plugin.services.storage.rocksdb;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static org.mockito.Answers.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;

import org.hyperledger.besu.plugin.services.exception.StorageException;

import java.nio.file.Path;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;
import org.rocksdb.Status;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;

@ExtendWith(MockitoExtension.class)
public class RocksDBTransactionTest {
static final Status BUSY = new Status(Status.Code.Busy, Status.SubCode.None, "Busy");
static final Status TIMED_OUT =
new Status(Status.Code.TimedOut, Status.SubCode.LockTimeout, "TimedOut(LockTimeout)");

@TempDir public Path folder;

@Mock(answer = RETURNS_DEEP_STUBS)
RocksDBMetrics mockMetrics;

@Mock Transaction mockTransaction;
@Mock WriteOptions mockOptions;

RocksDBTransaction tx;

@BeforeEach
void setupTx() {
tx = spy(new RocksDBTransaction(__ -> null, mockTransaction, mockOptions, mockMetrics));
RocksDBTransaction.RetryableRocksDBAction.resetTimeout(1);
}

@Test
public void assertNominalBehavior() throws Exception {
assertThatCode(tx::commit).doesNotThrowAnyException();
}

@Test
public void assertDefaultBusyRetryBehavior() throws Exception {
doThrow(new RocksDBException("Busy", BUSY))
.doThrow(new RocksDBException("Busy", BUSY))
.doNothing()
.when(mockTransaction)
.commit();

assertThatCode(tx::commit).doesNotThrowAnyException();
}

@Test
public void assertLockTimeoutBusyRetryBehavior() throws Exception {
doThrow(new RocksDBException("Busy", BUSY))
.doThrow(new RocksDBException("TimedOut(LockTimeout)", TIMED_OUT))
.doThrow(new RocksDBException("TimedOut(LockTimeout)", TIMED_OUT))
.doNothing()
.when(mockTransaction)
.commit();

assertThatCode(() -> tx.commit()).doesNotThrowAnyException();
}

@Test
public void assertBusyRetryFailBehavior() throws Exception {
doThrow(new RocksDBException("Busy", BUSY)).when(mockTransaction).commit();

assertThatThrownBy(tx::commit)
.isInstanceOf(StorageException.class)
.hasCauseInstanceOf(RocksDBException.class)
.hasMessageContaining("Busy");
}

@Test
public void assertRocksTxCloseOnRetryDoesNotThrow() throws Exception {
try (final OptimisticTransactionDB db =
OptimisticTransactionDB.open(new Options().setCreateIfMissing(true), folder.toString())) {
var writeOptions = new WriteOptions();
Transaction innerTx = spy(db.beginTransaction(writeOptions));

tx = spy(new RocksDBTransaction(__ -> null, innerTx, writeOptions, mockMetrics));

doThrow(new RocksDBException("Busy", BUSY))
.doThrow(new RocksDBException("Busy", BUSY))
.doCallRealMethod()
.when(innerTx)
.commit();

assertThatCode(tx::commit).doesNotThrowAnyException();
}
}
}