Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -49,6 +51,13 @@ public Settings indexSettings() {
return defaultIndexSettings();
}

IndexResponse indexSingleDoc(String indexName) {
return client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private Settings defaultIndexSettings() {
return Settings.builder()
.put(super.indexSettings())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 3)
public class RemoteStoreForceMergeIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Before
public void setup() {
setupRepo();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs) {
long totalOperations = 0;
long maxSeqNo = -1;
List<IndexResponse> indexResponseList = new ArrayList<>();
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc(INDEX_NAME);
maxSeqNo = response.getSeqNo();
indexResponseList.add(response);
}
totalOperations += numberOfOperations;
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
if (deletedDocs == -1) {
deletedDocs = totalOperations;
}
int length = indexResponseList.size();
for (int j = 0; j < deletedDocs; j++) {
maxSeqNo = client().prepareDelete().setIndex(INDEX_NAME).setId(indexResponseList.get(length - j - 1).getId()).get().getSeqNo();
}
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(flushAfterMerge).get();
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), totalOperations - deletedDocs);
Map<String, Long> indexingStats = new HashMap<>();
indexingStats.put(TOTAL_OPERATIONS, totalOperations);
indexingStats.put(MAX_SEQ_NO_TOTAL, maxSeqNo);
return indexingStats;
}

private void verifyRestoredData(Map<String, Long> indexStats, long deletedDocs) {
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) - deletedDocs);
IndexResponse response = indexSingleDoc(INDEX_NAME);
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1 - deletedDocs);
}

private void testRestoreWithMergeFlow(int numberOfIterations, boolean invokeFlush, boolean flushAfterMerge, long deletedDocs)
throws IOException {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, flushAfterMerge, deletedDocs);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
ensureGreen(INDEX_NAME);

if (deletedDocs == -1) {
verifyRestoredData(indexStats, indexStats.get(TOTAL_OPERATIONS));
} else {
verifyRestoredData(indexStats, deletedDocs);
}
}

// Following integ tests use randomBoolean to control the number of integ tests. If we use the separate
// values for each of the flags, number of integ tests become 16 in comparison to current 2.
// We have run all the 16 tests on local and they run fine.
public void testRestoreForceMergeSingleIteration() throws IOException {
boolean invokeFLush = randomBoolean();
boolean flushAfterMerge = randomBoolean();
testRestoreWithMergeFlow(1, invokeFLush, flushAfterMerge, randomIntBetween(0, 10));
}

public void testRestoreForceMergeMultipleIterations() throws IOException {
boolean invokeFLush = randomBoolean();
boolean flushAfterMerge = randomBoolean();
testRestoreWithMergeFlow(randomIntBetween(2, 5), invokeFLush, flushAfterMerge, randomIntBetween(0, 10));
}

public void testRestoreForceMergeMultipleIterationsDeleteAll() throws IOException {
boolean invokeFLush = randomBoolean();
boolean flushAfterMerge = randomBoolean();
testRestoreWithMergeFlow(randomIntBetween(2, 3), invokeFLush, flushAfterMerge, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.UUIDs;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -128,7 +126,7 @@ private void indexDocs() {
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
indexSingleDoc(INDEX_NAME);
}
}
}
Expand All @@ -149,12 +147,4 @@ private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) {
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
Expand Down Expand Up @@ -40,13 +39,10 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -216,77 +212,50 @@ private synchronized boolean syncSegments() {
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));

if (latestSegmentInfos.isPresent()) {
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
// all the segments from last commit if they are merged away but not yet committed.
// Each metadata file in the remote segment store represents a commit and the following
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
// segments.
SegmentInfos segmentCommitInfos;
try {
segmentCommitInfos = SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get());
} catch (Exception e) {
// Seeing discrepancy in segment infos and files on disk. SegmentInfosSnapshot is returning
// a segment_N file which does not exist on local disk.
logger.error("Exception occurred while SegmentInfos.readCommit(..)", e);
logger.error("segmentInfosFiles={} diskFiles={}", localSegmentsPostRefresh, storeDirectory.listAll());
throw e;
}
localSegmentsPostRefresh.addAll(segmentCommitInfos.files(true));
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(localSegmentsPostRefresh::remove);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
successful.set(true);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log
// integration.
logger.warn("Exception in post new segment upload actions", e);
}
// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
clearStaleFilesFromLocalSegmentChecksumMap(localSegmentsPostRefresh);
onSuccessfulSegmentsSync(
refreshTimeMs,
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
successful.set(true);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
// with remote trans-log integration.
logger.warn("Exception in post new segment upload actions", e);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
}, latch);
@Override
public void onFailure(Exception e) {
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
}, latch);

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
latch.await();
}
// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
latch.await();
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
// with remote trans-log integration.
logger.warn("Exception while uploading new segments to the remote segment store", e);
}
} catch (Throwable t) {
Expand Down