Skip to content

Commit a2a04c1

Browse files
author
Aditi Goyal
committed
Updated the code to include priority upload as paramter
1 parent 8ad3891 commit a2a04c1

File tree

3 files changed

+68
-118
lines changed

3 files changed

+68
-118
lines changed

server/src/main/java/org/opensearch/index/shard/RemoteStoreUploader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import java.util.function.Function;
1717

1818
/**
19-
* Interface to handle the functionality for upload data in the remote store
19+
* Interface to handle the functionality for uploading data in the remote store
2020
*/
2121
public interface RemoteStoreUploader {
2222

2323
void uploadSegments(
2424
Collection<String> localSegments,
2525
Map<String, Long> localSegmentsSizeMap,
2626
ActionListener<Void> listener,
27-
Function<Map<String, Long>, UploadListener> uploadListenerFunction
27+
Function<Map<String, Long>, UploadListener> uploadListenerFunction,
28+
boolean isLowPriorityUpload
2829
);
2930
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreUploaderService.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.apache.lucene.store.FilterDirectory;
1616
import org.apache.lucene.store.IOContext;
1717
import org.opensearch.action.support.GroupedActionListener;
18-
import org.opensearch.cluster.routing.RecoverySource;
1918
import org.opensearch.common.logging.Loggers;
2019
import org.opensearch.common.util.UploadListener;
2120
import org.opensearch.core.action.ActionListener;
@@ -50,7 +49,8 @@ public void uploadSegments(
5049
Collection<String> localSegments,
5150
Map<String, Long> localSegmentsSizeMap,
5251
ActionListener<Void> listener,
53-
Function<Map<String, Long>, UploadListener> uploadListenerFunction
52+
Function<Map<String, Long>, UploadListener> uploadListenerFunction,
53+
boolean isLowPriorityUpload
5454
) {
5555
if (localSegments.isEmpty()) {
5656
logger.debug("No new segments to upload in uploadNewSegments");
@@ -83,22 +83,7 @@ public void uploadSegments(
8383
});
8484
statsListener.beforeUpload(localSegment);
8585
// Place where the actual upload is happening
86-
remoteDirectory.copyFrom(storeDirectory, localSegment, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload());
86+
remoteDirectory.copyFrom(storeDirectory, localSegment, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload);
8787
}
8888
}
89-
90-
boolean isLowPriorityUpload() {
91-
return isLocalOrSnapshotRecoveryOrSeeding();
92-
}
93-
94-
boolean isLocalOrSnapshotRecoveryOrSeeding() {
95-
// In this case when the primary mode is false, we need to upload segments to Remote Store
96-
// This is required in case of remote migration seeding/snapshots/shrink/ split/clone where we need to durable persist
97-
// all segments to remote before completing the recovery to ensure durability.
98-
return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary())
99-
&& indexShard.recoveryState() != null
100-
&& (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
101-
|| indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT
102-
|| indexShard.shouldSeedRemoteStore());
103-
}
10489
}

server/src/test/java/org/opensearch/index/shard/RemoteStoreUploaderServiceTests.java

Lines changed: 62 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.apache.lucene.index.CorruptIndexException;
1212
import org.apache.lucene.store.Directory;
1313
import org.apache.lucene.store.FilterDirectory;
14-
import org.opensearch.cluster.routing.RecoverySource;
1514
import org.opensearch.cluster.routing.ShardRouting;
1615
import org.opensearch.common.util.UploadListener;
1716
import org.opensearch.core.action.ActionListener;
@@ -21,7 +20,6 @@
2120
import org.opensearch.index.store.RemoteDirectory;
2221
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
2322
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
24-
import org.opensearch.indices.recovery.RecoveryState;
2523
import org.opensearch.test.OpenSearchTestCase;
2624
import org.opensearch.threadpool.ThreadPool;
2725

@@ -102,12 +100,12 @@ public void testUploadSegmentsWithEmptyCollection() throws Exception {
102100
exception -> fail("Should not fail for empty segments")
103101
);
104102

105-
uploaderService.uploadSegments(emptySegments, segmentSizeMap, listener, mockUploadListenerFunction);
103+
uploaderService.uploadSegments(emptySegments, segmentSizeMap, listener, mockUploadListenerFunction, false);
106104

107105
assertTrue(latch.await(1, TimeUnit.SECONDS));
108106
}
109107

110-
public void testUploadSegmentsSuccess() throws Exception {
108+
public void testUploadSegmentsSuccessWithHighPriorityUpload() throws Exception {
111109
Collection<String> segments = Arrays.asList("segment1", "segment2");
112110
Map<String, Long> segmentSizeMap = new HashMap<>();
113111
segmentSizeMap.put("segment1", 100L);
@@ -155,7 +153,63 @@ public void testUploadSegmentsSuccess() throws Exception {
155153
exception -> fail("Upload should succeed: " + exception.getMessage())
156154
);
157155

158-
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction);
156+
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction, false);
157+
158+
assertTrue(latch.await(5, TimeUnit.SECONDS));
159+
// Verify the upload listener was called correctly
160+
verify(mockUploadListener, times(2)).beforeUpload(any(String.class));
161+
verify(mockUploadListener, times(2)).onSuccess(any(String.class));
162+
}
163+
164+
public void testUploadSegmentsSuccessWithLowPriorityUpload() throws Exception {
165+
Collection<String> segments = Arrays.asList("segment1", "segment2");
166+
Map<String, Long> segmentSizeMap = new HashMap<>();
167+
segmentSizeMap.put("segment1", 100L);
168+
segmentSizeMap.put("segment2", 200L);
169+
170+
// Create a fresh mock IndexShard
171+
IndexShard freshMockShard = mock(IndexShard.class);
172+
ShardId shardId = new ShardId(new Index("test", "test"), 1);
173+
when(freshMockShard.shardId()).thenReturn(shardId);
174+
when(freshMockShard.state()).thenReturn(IndexShardState.STARTED);
175+
176+
// Create a mock directory structure that matches what the code expects
177+
Directory innerMockDelegate = mock(Directory.class);
178+
FilterDirectory innerFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(innerMockDelegate));
179+
180+
FilterDirectory outerFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(innerFilterDirectory));
181+
182+
// Setup the real RemoteSegmentStoreDirectory to handle copyFrom calls
183+
RemoteDirectory remoteDirectory = mock(RemoteDirectory.class);
184+
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(
185+
remoteDirectory,
186+
mock(RemoteDirectory.class),
187+
mock(RemoteStoreLockManager.class),
188+
freshMockShard.getThreadPool(),
189+
freshMockShard.shardId()
190+
);
191+
192+
// Create a new uploader service with the fresh mocks
193+
RemoteStoreUploaderService testUploaderService = new RemoteStoreUploaderService(
194+
freshMockShard,
195+
outerFilterDirectory,
196+
remoteSegmentStoreDirectory
197+
);
198+
199+
doAnswer(invocation -> {
200+
ActionListener<Void> callback = invocation.getArgument(5);
201+
callback.onResponse(null);
202+
return true;
203+
}).when(remoteDirectory).copyFrom(any(), any(), any(), any(), any(), any(), any(Boolean.class));
204+
205+
CountDownLatch latch = new CountDownLatch(1);
206+
207+
ActionListener<Void> listener = ActionListener.wrap(
208+
response -> latch.countDown(),
209+
exception -> fail("Upload should succeed: " + exception.getMessage())
210+
);
211+
212+
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction, true);
159213

160214
assertTrue(latch.await(5, TimeUnit.SECONDS));
161215
// Verify the upload listener was called correctly
@@ -213,7 +267,7 @@ public void testUploadSegmentsWithCompositeDirectory() throws Exception {
213267
exception -> fail("Upload should succeed: " + exception.getMessage())
214268
);
215269

216-
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction);
270+
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction, false);
217271

218272
assertTrue(latch.await(5, TimeUnit.SECONDS));
219273
verify(mockCompositeDirectory).afterSyncToRemote("segment1");
@@ -272,7 +326,7 @@ public void testUploadSegmentsWithCorruptIndexException() throws Exception {
272326
latch.countDown();
273327
});
274328

275-
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction);
329+
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction, false);
276330

277331
assertTrue(latch.await(5, TimeUnit.SECONDS));
278332
verify(freshMockShard).failShard(eq("Index corrupted (resource=test)"), eq(corruptException));
@@ -331,103 +385,13 @@ public void testUploadSegmentsWithGenericException() throws Exception {
331385
latch.countDown();
332386
});
333387

334-
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction);
388+
testUploaderService.uploadSegments(segments, segmentSizeMap, listener, mockUploadListenerFunction, false);
335389

336390
assertTrue(latch.await(5, TimeUnit.SECONDS));
337391
verify(freshMockShard, never()).failShard(any(), any());
338392
verify(mockUploadListener).onFailure("segment1");
339393
}
340394

341-
public void testIsLowPriorityUpload() {
342-
when(mockIndexShard.state()).thenReturn(IndexShardState.RECOVERING);
343-
344-
ShardRouting mockShardRouting = mock(ShardRouting.class);
345-
mockIndexShard.shardRouting = mockShardRouting;
346-
when(mockShardRouting.primary()).thenReturn(true);
347-
348-
RecoveryState mockRecoveryState = mock(RecoveryState.class);
349-
RecoverySource mockRecoverySource = mock(RecoverySource.class);
350-
when(mockRecoverySource.getType()).thenReturn(RecoverySource.Type.LOCAL_SHARDS);
351-
when(mockRecoveryState.getRecoverySource()).thenReturn(mockRecoverySource);
352-
when(mockIndexShard.recoveryState()).thenReturn(mockRecoveryState);
353-
354-
assertTrue(uploaderService.isLowPriorityUpload());
355-
}
356-
357-
public void testIsLocalOrSnapshotRecoveryOrSeedingWithLocalShards() {
358-
when(mockIndexShard.state()).thenReturn(IndexShardState.RECOVERING);
359-
ShardRouting mockShardRouting = mock(ShardRouting.class);
360-
mockIndexShard.shardRouting = mockShardRouting;
361-
when(mockShardRouting.primary()).thenReturn(true);
362-
363-
RecoveryState mockRecoveryState = mock(RecoveryState.class);
364-
RecoverySource mockRecoverySource = mock(RecoverySource.class);
365-
when(mockRecoverySource.getType()).thenReturn(RecoverySource.Type.LOCAL_SHARDS);
366-
when(mockRecoveryState.getRecoverySource()).thenReturn(mockRecoverySource);
367-
when(mockIndexShard.recoveryState()).thenReturn(mockRecoveryState);
368-
369-
assertTrue(uploaderService.isLocalOrSnapshotRecoveryOrSeeding());
370-
}
371-
372-
public void testIsLocalOrSnapshotRecoveryOrSeedingWithSnapshot() {
373-
when(mockIndexShard.state()).thenReturn(IndexShardState.RECOVERING);
374-
ShardRouting mockShardRouting = mock(ShardRouting.class);
375-
mockIndexShard.shardRouting = mockShardRouting;
376-
when(mockShardRouting.primary()).thenReturn(true);
377-
378-
RecoveryState mockRecoveryState = mock(RecoveryState.class);
379-
RecoverySource mockRecoverySource = mock(RecoverySource.class);
380-
when(mockRecoverySource.getType()).thenReturn(RecoverySource.Type.SNAPSHOT);
381-
when(mockRecoveryState.getRecoverySource()).thenReturn(mockRecoverySource);
382-
when(mockIndexShard.recoveryState()).thenReturn(mockRecoveryState);
383-
384-
assertTrue(uploaderService.isLocalOrSnapshotRecoveryOrSeeding());
385-
}
386-
387-
public void testIsLocalOrSnapshotRecoveryOrSeedingWithSeeding() {
388-
when(mockIndexShard.state()).thenReturn(IndexShardState.RECOVERING);
389-
ShardRouting mockShardRouting = mock(ShardRouting.class);
390-
mockIndexShard.shardRouting = mockShardRouting;
391-
when(mockShardRouting.primary()).thenReturn(true);
392-
393-
RecoveryState mockRecoveryState = mock(RecoveryState.class);
394-
RecoverySource mockRecoverySource = mock(RecoverySource.class);
395-
when(mockRecoverySource.getType()).thenReturn(RecoverySource.Type.PEER);
396-
when(mockRecoveryState.getRecoverySource()).thenReturn(mockRecoverySource);
397-
when(mockIndexShard.recoveryState()).thenReturn(mockRecoveryState);
398-
when(mockIndexShard.shouldSeedRemoteStore()).thenReturn(true);
399-
400-
assertTrue(uploaderService.isLocalOrSnapshotRecoveryOrSeeding());
401-
}
402-
403-
public void testIsLocalOrSnapshotRecoveryOrSeedingReturnsFalse() {
404-
when(mockIndexShard.state()).thenReturn(IndexShardState.STARTED);
405-
ShardRouting mockShardRouting = mock(ShardRouting.class);
406-
mockIndexShard.shardRouting = mockShardRouting;
407-
when(mockShardRouting.primary()).thenReturn(true);
408-
409-
assertFalse(uploaderService.isLocalOrSnapshotRecoveryOrSeeding());
410-
}
411-
412-
public void testIsLocalOrSnapshotRecoveryOrSeedingWithNonPrimary() {
413-
when(mockIndexShard.state()).thenReturn(IndexShardState.RECOVERING);
414-
ShardRouting mockShardRouting = mock(ShardRouting.class);
415-
mockIndexShard.shardRouting = mockShardRouting;
416-
when(mockShardRouting.primary()).thenReturn(true);
417-
418-
assertFalse(uploaderService.isLocalOrSnapshotRecoveryOrSeeding());
419-
}
420-
421-
public void testIsLocalOrSnapshotRecoveryOrSeedingWithNullRecoveryState() {
422-
when(mockIndexShard.state()).thenReturn(IndexShardState.RECOVERING);
423-
ShardRouting mockShardRouting = mock(ShardRouting.class);
424-
mockIndexShard.shardRouting = mockShardRouting;
425-
when(mockShardRouting.primary()).thenReturn(true);
426-
when(mockIndexShard.recoveryState()).thenReturn(null);
427-
428-
assertFalse(uploaderService.isLocalOrSnapshotRecoveryOrSeeding());
429-
}
430-
431395
public static class TestFilterDirectory extends FilterDirectory {
432396

433397
/**

0 commit comments

Comments
 (0)