diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java index 0f08e3c0f9b..b00bb447fd6 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBKeyValueStorageFactory.java @@ -14,7 +14,6 @@ */ package org.hyperledger.besu.plugin.services.storage.rocksdb; -import org.hyperledger.besu.ethereum.worldstate.DataStorageFormat; import org.hyperledger.besu.plugin.services.BesuConfiguration; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.plugin.services.exception.StorageException; @@ -28,7 +27,6 @@ import org.hyperledger.besu.plugin.services.storage.rocksdb.configuration.RocksDBFactoryConfiguration; import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.OptimisticRocksDBColumnarKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.RocksDBColumnarKeyValueStorage; -import org.hyperledger.besu.plugin.services.storage.rocksdb.segmented.TransactionDBRocksDBColumnarKeyValueStorage; import org.hyperledger.besu.services.kvstore.SegmentedKeyValueStorageAdapter; import java.io.IOException; @@ -166,8 +164,10 @@ public SegmentedKeyValueStorage create( final BesuConfiguration commonConfiguration, final MetricsSystem metricsSystem) throws StorageException { - final boolean isForestStorageFormat = - DataStorageFormat.FOREST.getDatabaseVersion() == commonConfiguration.getDatabaseVersion(); + // TODO: removed for testing/discovery + // final boolean isForestStorageFormat = + // DataStorageFormat.FOREST.getDatabaseVersion() == + // commonConfiguration.getDatabaseVersion(); if (requiresInit()) { init(commonConfiguration); } @@ -192,25 +192,26 @@ public SegmentedKeyValueStorage create( configuredSegments.stream() .filter(segmentId -> segmentId.includeInDatabaseVersion(databaseVersion)) .collect(Collectors.toList()); - if (isForestStorageFormat) { - LOG.debug("FOREST mode detected, using TransactionDB."); - segmentedStorage = - new TransactionDBRocksDBColumnarKeyValueStorage( - rocksDBConfiguration, - segmentsForVersion, - ignorableSegments, - metricsSystem, - rocksDBMetricsFactory); - } else { - LOG.debug("Using OptimisticTransactionDB."); - segmentedStorage = - new OptimisticRocksDBColumnarKeyValueStorage( - rocksDBConfiguration, - segmentsForVersion, - ignorableSegments, - metricsSystem, - rocksDBMetricsFactory); - } + // TODO: removing TransactionDB for testing/discovery only: + // if (isForestStorageFormat) { + // LOG.debug("FOREST mode detected, using TransactionDB."); + // segmentedStorage = + // new TransactionDBRocksDBColumnarKeyValueStorage( + // rocksDBConfiguration, + // segmentsForVersion, + // ignorableSegments, + // metricsSystem, + // rocksDBMetricsFactory); + // } else { + LOG.debug("Using OptimisticTransactionDB."); + segmentedStorage = + new OptimisticRocksDBColumnarKeyValueStorage( + rocksDBConfiguration, + segmentsForVersion, + ignorableSegments, + metricsSystem, + rocksDBMetricsFactory); + // } } return segmentedStorage; } diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java index 53f7a8fedac..4b34719362a 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransaction.java @@ -14,15 +14,22 @@ */ package org.hyperledger.besu.plugin.services.storage.rocksdb; +import static org.hyperledger.besu.plugin.services.storage.rocksdb.RocksDBTransaction.RetryableRocksDBAction.maybeRetryRocksDBAction; + import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; +import java.util.EnumSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; +import org.rocksdb.Status; import org.rocksdb.Transaction; import org.rocksdb.WriteOptions; import org.slf4j.Logger; @@ -31,7 +38,8 @@ /** The RocksDb transaction. */ public class RocksDBTransaction implements SegmentedKeyValueStorageTransaction { private static final Logger logger = LoggerFactory.getLogger(RocksDBTransaction.class); - private static final String NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; + private static final String ERR_NO_SPACE_LEFT_ON_DEVICE = "No space left on device"; + private static final int DEFAULT_MAX_RETRIES = 5; private final RocksDBMetrics metrics; private final Transaction innerTx; @@ -62,11 +70,11 @@ public void put(final SegmentIdentifier segmentId, final byte[] key, final byte[ try (final OperationTimer.TimingContext ignored = metrics.getWriteLatency().startTimer()) { innerTx.put(columnFamilyMapper.apply(segmentId), key, value); } catch (final RocksDBException e) { - if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { - logger.error(e.getMessage()); - System.exit(0); - } - throw new StorageException(e); + maybeRetryRocksDBAction( + e, + 0, + DEFAULT_MAX_RETRIES, + () -> innerTx.put(columnFamilyMapper.apply(segmentId), key, value)); } } @@ -75,11 +83,11 @@ public void remove(final SegmentIdentifier segmentId, final byte[] key) { try (final OperationTimer.TimingContext ignored = metrics.getRemoveLatency().startTimer()) { innerTx.delete(columnFamilyMapper.apply(segmentId), key); } catch (final RocksDBException e) { - if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { - logger.error(e.getMessage()); - System.exit(0); - } - throw new StorageException(e); + maybeRetryRocksDBAction( + e, + 0, + DEFAULT_MAX_RETRIES, + () -> innerTx.delete(columnFamilyMapper.apply(segmentId), key)); } } @@ -88,11 +96,7 @@ public void commit() throws StorageException { try (final OperationTimer.TimingContext ignored = metrics.getCommitLatency().startTimer()) { innerTx.commit(); } catch (final RocksDBException e) { - if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { - logger.error(e.getMessage()); - System.exit(0); - } - throw new StorageException(e); + maybeRetryRocksDBAction(e, 0, DEFAULT_MAX_RETRIES, innerTx::commit); } finally { close(); } @@ -102,14 +106,10 @@ public void commit() throws StorageException { public void rollback() { try { innerTx.rollback(); - metrics.getRollbackCount().inc(); } catch (final RocksDBException e) { - if (e.getMessage().contains(NO_SPACE_LEFT_ON_DEVICE)) { - logger.error(e.getMessage()); - System.exit(0); - } - throw new StorageException(e); + maybeRetryRocksDBAction(e, 0, DEFAULT_MAX_RETRIES, innerTx::rollback); } finally { + metrics.getRollbackCount().inc(); close(); } } @@ -118,4 +118,71 @@ private void close() { innerTx.close(); options.close(); } + + @FunctionalInterface + interface RetryableRocksDBAction { + void retry() throws RocksDBException; + + EnumSet RETRYABLE_STATUS_CODES = + EnumSet.of(Status.Code.TimedOut, Status.Code.TryAgain, Status.Code.Busy); + + static void maybeRetryRocksDBAction( + final RocksDBException ex, + final int attemptNumber, + final int retryLimit, + final RetryableRocksDBAction retryAction) { + + if (ex.getMessage().contains(ERR_NO_SPACE_LEFT_ON_DEVICE)) { + logger.error(ex.getMessage()); + System.exit(0); + } + if (attemptNumber <= retryLimit) { + if (RETRYABLE_STATUS_CODES.contains(ex.getStatus().getCode())) { + logger.warn( + "RocksDB Transient exception caught on attempt {} of {}, status: {}, retrying.", + attemptNumber, + retryLimit, + ex.getStatus().getCodeString()); + try { + retryBackoff(); + retryAction.retry(); + } catch (RocksDBException ex2) { + maybeRetryRocksDBAction(ex2, attemptNumber + 1, retryLimit, retryAction); + } + } + } else { + throw new StorageException(ex); + } + } + + long BASE_TIMEOUT = 1000; // Base timeout in milliseconds + long MAX_TIMEOUT = 30000; // Max timeout in milliseconds + long DECAY_TIME = 5000; // Time in milliseconds after which the timeout decays + AtomicLong timeout = new AtomicLong(BASE_TIMEOUT); + AtomicLong lastCallTime = new AtomicLong(System.currentTimeMillis()); + + @VisibleForTesting + static void resetTimeout(final long timeoutVal) { + timeout.set(timeoutVal); + } + + static void retryBackoff() { + try { + long currentTime = System.currentTimeMillis(); + long delay = timeout.get(); + // If no retries for DECAY_TIME milliseconds, decay the timeout towards base value + long callDiff = currentTime - lastCallTime.get(); + if (callDiff > DECAY_TIME) { + delay = Math.max(BASE_TIMEOUT, delay - callDiff / 2); + timeout.set(delay); + } + TimeUnit.MILLISECONDS.sleep(delay); + // Increase the timeout for the next call, up to the maximum + timeout.updateAndGet(t -> Math.min(t * 2, MAX_TIMEOUT)); + lastCallTime.set(currentTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Preserve interrupt status + } + } + } } diff --git a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java index 4825154561a..6ade2d5e80f 100644 --- a/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java +++ b/plugins/rocksdb/src/main/java/org/hyperledger/besu/plugin/services/storage/rocksdb/segmented/TransactionDBRocksDBColumnarKeyValueStorage.java @@ -28,6 +28,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.TransactionDB; +import org.rocksdb.TransactionOptions; import org.rocksdb.WriteOptions; /** TransactionDB RocksDB Columnar key value storage */ @@ -85,10 +86,15 @@ RocksDB getDB() { public SegmentedKeyValueStorageTransaction startTransaction() throws StorageException { throwIfClosed(); final WriteOptions writeOptions = new WriteOptions(); + final TransactionOptions transactionOptions = + new TransactionOptions().setLockTimeout(5000).setDeadlockDetect(true); writeOptions.setIgnoreMissingColumnFamilies(true); return new SegmentedKeyValueStorageTransactionValidatorDecorator( new RocksDBTransaction( - this::safeColumnHandle, db.beginTransaction(writeOptions), writeOptions, metrics), + this::safeColumnHandle, + db.beginTransaction(writeOptions, transactionOptions), + writeOptions, + metrics), this.closed::get); } } diff --git a/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransactionTest.java b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransactionTest.java new file mode 100644 index 00000000000..199b7f9b383 --- /dev/null +++ b/plugins/rocksdb/src/test/java/org/hyperledger/besu/plugin/services/storage/rocksdb/RocksDBTransactionTest.java @@ -0,0 +1,118 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.plugin.services.storage.rocksdb; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import org.hyperledger.besu.plugin.services.exception.StorageException; + +import java.nio.file.Path; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.rocksdb.OptimisticTransactionDB; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; +import org.rocksdb.Status; +import org.rocksdb.Transaction; +import org.rocksdb.WriteOptions; + +@ExtendWith(MockitoExtension.class) +public class RocksDBTransactionTest { + static final Status BUSY = new Status(Status.Code.Busy, Status.SubCode.None, "Busy"); + static final Status TIMED_OUT = + new Status(Status.Code.TimedOut, Status.SubCode.LockTimeout, "TimedOut(LockTimeout)"); + + @TempDir public Path folder; + + @Mock(answer = RETURNS_DEEP_STUBS) + RocksDBMetrics mockMetrics; + + @Mock Transaction mockTransaction; + @Mock WriteOptions mockOptions; + + RocksDBTransaction tx; + + @BeforeEach + void setupTx() { + tx = spy(new RocksDBTransaction(__ -> null, mockTransaction, mockOptions, mockMetrics)); + RocksDBTransaction.RetryableRocksDBAction.resetTimeout(1); + } + + @Test + public void assertNominalBehavior() throws Exception { + assertThatCode(tx::commit).doesNotThrowAnyException(); + } + + @Test + public void assertDefaultBusyRetryBehavior() throws Exception { + doThrow(new RocksDBException("Busy", BUSY)) + .doThrow(new RocksDBException("Busy", BUSY)) + .doNothing() + .when(mockTransaction) + .commit(); + + assertThatCode(tx::commit).doesNotThrowAnyException(); + } + + @Test + public void assertLockTimeoutBusyRetryBehavior() throws Exception { + doThrow(new RocksDBException("Busy", BUSY)) + .doThrow(new RocksDBException("TimedOut(LockTimeout)", TIMED_OUT)) + .doThrow(new RocksDBException("TimedOut(LockTimeout)", TIMED_OUT)) + .doNothing() + .when(mockTransaction) + .commit(); + + assertThatCode(() -> tx.commit()).doesNotThrowAnyException(); + } + + @Test + public void assertBusyRetryFailBehavior() throws Exception { + doThrow(new RocksDBException("Busy", BUSY)).when(mockTransaction).commit(); + + assertThatThrownBy(tx::commit) + .isInstanceOf(StorageException.class) + .hasCauseInstanceOf(RocksDBException.class) + .hasMessageContaining("Busy"); + } + + @Test + public void assertRocksTxCloseOnRetryDoesNotThrow() throws Exception { + try (final OptimisticTransactionDB db = + OptimisticTransactionDB.open(new Options().setCreateIfMissing(true), folder.toString())) { + var writeOptions = new WriteOptions(); + Transaction innerTx = spy(db.beginTransaction(writeOptions)); + + tx = spy(new RocksDBTransaction(__ -> null, innerTx, writeOptions, mockMetrics)); + + doThrow(new RocksDBException("Busy", BUSY)) + .doThrow(new RocksDBException("Busy", BUSY)) + .doCallRealMethod() + .when(innerTx) + .commit(); + + assertThatCode(tx::commit).doesNotThrowAnyException(); + } + } +}