diff --git a/CHANGELOG.md b/CHANGELOG.md index a4e920f3a202d..4978c7aed3cfe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for Azure Managed Identity in repository-azure ([#12423](https://github.com/opensearch-project/OpenSearch/issues/12423)) - Add useCompoundFile index setting ([#13478](https://github.com/opensearch-project/OpenSearch/pull/13478)) - Make outbound side of transport protocol dependent ([#13293](https://github.com/opensearch-project/OpenSearch/pull/13293)) +- [Remote Store] Add dynamic cluster settings to set timeout for segments upload to Remote Store ([#13679](https://github.com/opensearch-project/OpenSearch/pull/13679)) ### Dependencies - Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559)) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 65016c4976157..7ae08bf968ade 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -26,7 +26,6 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED; -import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 4a5a45eb1a17a..33c0a498fc449 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -737,6 +737,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING, + RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING, RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 18d4a2ca6d639..42634d2129faa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3970,7 +3970,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro new RemoteStoreRefreshListener( this, this.checkpointPublisher, - remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()) + remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId()), + remoteStoreSettings ) ); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index bfb841307af49..20afd7b2f3568 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -33,6 +33,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.ThreadPool; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -89,11 +91,13 @@ public final class RemoteStoreRefreshListener extends ReleasableRetryableRefresh private volatile long primaryTerm; private volatile Iterator backoffDelayIterator; private final SegmentReplicationCheckpointPublisher checkpointPublisher; + private final RemoteStoreSettings remoteStoreSettings; public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, - RemoteSegmentTransferTracker segmentTracker + RemoteSegmentTransferTracker segmentTracker, + RemoteStoreSettings remoteStoreSettings ) { super(indexShard.getThreadPool()); logger = Loggers.getLogger(getClass(), indexShard.shardId()); @@ -116,6 +120,7 @@ public RemoteStoreRefreshListener( this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; + this.remoteStoreSettings = remoteStoreSettings; } @Override @@ -286,7 +291,12 @@ public void onFailure(Exception e) { // Start the segments files upload uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener); - latch.await(); + if (latch.await( + remoteStoreSettings.getClusterRemoteSegmentTransferTimeout().millis(), + TimeUnit.MILLISECONDS + ) == false) { + throw new SegmentUploadFailedException("Timeout while waiting for remote segment transfer to complete"); + } } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); } diff --git a/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java new file mode 100644 index 0000000000000..bbff399fb71ff --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/SegmentUploadFailedException.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import java.io.IOException; + +/** + * Exception to be thrown when a segment upload fails. + * + * @opensearch.internal + */ +public class SegmentUploadFailedException extends IOException { + + /** + * Creates a new SegmentUploadFailedException. + * + * @param message error message + */ + public SegmentUploadFailedException(String message) { + super(message); + } +} diff --git a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java index 0bd4c7aedfc03..5234a09a58541 100644 --- a/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java +++ b/server/src/main/java/org/opensearch/indices/RemoteStoreSettings.java @@ -105,9 +105,21 @@ public class RemoteStoreSettings { Property.NodeScope ); + /** + * Controls timeout value while uploading segment files to remote segment store + */ + public static final Setting CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.segment.transfer_timeout", + TimeValue.timeValueMinutes(30), + TimeValue.timeValueMinutes(10), + Property.NodeScope, + Property.Dynamic + ); + private volatile TimeValue clusterRemoteTranslogBufferInterval; private volatile int minRemoteSegmentMetadataFiles; private volatile TimeValue clusterRemoteTranslogTransferTimeout; + private volatile TimeValue clusterRemoteSegmentTransferTimeout; private volatile RemoteStoreEnums.PathType pathType; private volatile RemoteStoreEnums.PathHashAlgorithm pathHashAlgorithm; private volatile int maxRemoteTranslogReaders; @@ -139,6 +151,12 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) { maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings); clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders); + + clusterRemoteSegmentTransferTimeout = CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING, + this::setClusterRemoteSegmentTransferTimeout + ); } public TimeValue getClusterRemoteTranslogBufferInterval() { @@ -161,10 +179,18 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() { return clusterRemoteTranslogTransferTimeout; } + public TimeValue getClusterRemoteSegmentTransferTimeout() { + return clusterRemoteSegmentTransferTimeout; + } + private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) { this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout; } + private void setClusterRemoteSegmentTransferTimeout(TimeValue clusterRemoteSegmentTransferTimeout) { + this.clusterRemoteSegmentTransferTimeout = clusterRemoteSegmentTransferTimeout; + } + @ExperimentalApi public RemoteStoreEnums.PathType getPathType() { return pathType; diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 8fbff4527ec7b..bb0776e0ced25 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -23,6 +23,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.engine.InternalEngineFactory; @@ -34,6 +35,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.indices.DefaultRemoteStoreSettings; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; @@ -91,7 +93,12 @@ public void setup(boolean primary, int numberOfDocs) throws IOException { remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, Settings.EMPTY); remoteStoreStatsTrackerFactory.afterIndexShardCreated(indexShard); RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); - remoteStoreRefreshListener = new RemoteStoreRefreshListener(indexShard, SegmentReplicationCheckpointPublisher.EMPTY, tracker); + remoteStoreRefreshListener = new RemoteStoreRefreshListener( + indexShard, + SegmentReplicationCheckpointPublisher.EMPTY, + tracker, + DefaultRemoteStoreSettings.INSTANCE + ); } private void indexDocs(int startDocId, int numberOfDocs) throws IOException { @@ -176,7 +183,12 @@ public void testRemoteDirectoryInitThrowsException() throws IOException { when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); // Since the thrown IOException is caught in the constructor, ctor should be invoked successfully. - new RemoteStoreRefreshListener(shard, SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteSegmentTransferTracker.class)); + new RemoteStoreRefreshListener( + shard, + SegmentReplicationCheckpointPublisher.EMPTY, + mock(RemoteSegmentTransferTracker.class), + DefaultRemoteStoreSettings.INSTANCE + ); // Validate that the stream of metadata file of remoteMetadataDirectory has been opened only once and the // listFilesByPrefixInLexicographicOrder has been called twice. @@ -371,6 +383,33 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } + public void testSegmentUploadTimeout() throws Exception { + // This covers the case where segment upload fails due to timeout + int succeedOnAttempt = 1; + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. + CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); + CountDownLatch successLatch = new CountDownLatch(2); + Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh( + succeedOnAttempt, + refreshCountLatch, + successLatch, + 1, + new CountDownLatch(0), + true, + true + ); + assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); + assertBusy(() -> assertEquals(1, successLatch.getCount())); + RemoteStoreStatsTrackerFactory trackerFactory = tuple.v2(); + RemoteSegmentTransferTracker segmentTracker = trackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); + assertBusy(() -> { + assertTrue(segmentTracker.getTotalUploadsFailed() > 1); + assertTrue(segmentTracker.getTotalUploadsSucceeded() < 2); + }); + // shutdown threadpool for avoid leaking threads + indexShard.getThreadPool().shutdownNow(); + } + /** * Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt. * Snapshot and metadata files created in failed attempt should not break retry. @@ -470,6 +509,7 @@ public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception { successLatch, checkpointPublishSucceedOnAttempt, reachedCheckpointPublishLatch, + false, false ); @@ -521,7 +561,8 @@ private Tuple mockIn successLatch, succeedCheckpointPublishOnAttempt, reachedCheckpointPublishLatch, - true + true, + false ); } @@ -531,7 +572,8 @@ private Tuple mockIn CountDownLatch successLatch, int succeedCheckpointPublishOnAttempt, CountDownLatch reachedCheckpointPublishLatch, - boolean mockPrimaryTerm + boolean mockPrimaryTerm, + boolean testUploadTimeout ) throws IOException { // Create index shard that we will be using to mock different methods in IndexShard for the unit test indexShard = newStartedShard( @@ -565,9 +607,22 @@ private Tuple mockIn // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) Store remoteStore = mock(Store.class); when(shard.remoteStore()).thenReturn(remoteStore); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = - (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) - .getDelegate(); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory; + RemoteDirectory remoteDirectory = mock(RemoteDirectory.class); + + if (testUploadTimeout) { + remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory( + remoteDirectory, + mock(RemoteDirectory.class), + mock(RemoteStoreLockManager.class), + indexShard.getThreadPool(), + indexShard.shardId + ); + } else { + remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore() + .directory()).getDelegate()).getDelegate(); + } + FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory)); when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); @@ -639,7 +694,28 @@ private Tuple mockIn RemoteStoreSettings remoteStoreSettings = mock(RemoteStoreSettings.class); when(remoteStoreSettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10); when(shard.getRemoteStoreSettings()).thenReturn(remoteStoreSettings); - RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); + if (testUploadTimeout) { + when(remoteStoreSettings.getClusterRemoteSegmentTransferTimeout()).thenReturn(TimeValue.timeValueMillis(10)); + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(5); + indexShard.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + logger.warn("copyFrom thread interrupted during sleep"); + } + actionListener.onResponse(null); + }); + return true; + }).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(ActionListener.class), any(Boolean.class)); + } + + RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener( + shard, + emptyCheckpointPublisher, + tracker, + remoteStoreSettings + ); refreshListener.afterRefresh(true); return Tuple.tuple(refreshListener, remoteStoreStatsTrackerFactory); }