Skip to content

Commit 73e69d0

Browse files
committed
Add metadata prefix to Remote Translog Metadata file
Signed-off-by: Gaurav Bafna <[email protected]>
1 parent 3952d5e commit 73e69d0

File tree

6 files changed

+45
-19
lines changed

6 files changed

+45
-19
lines changed

server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,18 @@ public void listFoldersAsync(String threadpoolName, Iterable<String> path, Actio
213213
});
214214
}
215215

216-
public void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener) {
217-
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder("", limit, LEXICOGRAPHIC, listener);
216+
public void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener) {
217+
blobStore.blobContainer((BlobPath) path).listBlobsByPrefixInSortedOrder(filenamePrefix, limit, LEXICOGRAPHIC, listener);
218218
}
219219

220220
public void listAllInSortedOrderAsync(
221221
String threadpoolName,
222222
Iterable<String> path,
223+
String filenamePrefix,
223224
int limit,
224225
ActionListener<List<BlobMetadata>> listener
225226
) {
226-
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, limit, listener); });
227+
threadPool.executor(threadpoolName).execute(() -> { listAllInSortedOrder(path, filenamePrefix, limit, listener); });
227228
}
228229

229230
}

server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,14 @@ void uploadBlobs(
125125
*/
126126
InputStream downloadBlob(Iterable<String> path, String fileName) throws IOException;
127127

128-
void listAllInSortedOrder(Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);
129-
130-
void listAllInSortedOrderAsync(String threadpoolName, Iterable<String> path, int limit, ActionListener<List<BlobMetadata>> listener);
128+
void listAllInSortedOrder(Iterable<String> path, String filenamePrefix, int limit, ActionListener<List<BlobMetadata>> listener);
129+
130+
void listAllInSortedOrderAsync(
131+
String threadpoolName,
132+
Iterable<String> path,
133+
String filenamePrefix,
134+
int limit,
135+
ActionListener<List<BlobMetadata>> listener
136+
);
131137

132138
}

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,12 @@ public TranslogTransferMetadata readMetadata() throws IOException {
210210
);
211211

212212
try {
213-
transferService.listAllInSortedOrder(remoteMetadataTransferPath, 1, latchedActionListener);
213+
transferService.listAllInSortedOrder(
214+
remoteMetadataTransferPath,
215+
TranslogTransferMetadata.METADATA_PREFIX,
216+
1,
217+
latchedActionListener
218+
);
214219
latch.await();
215220
} catch (InterruptedException e) {
216221
throw new IOException("Exception while reading/downloading metadafile", e);
@@ -367,6 +372,7 @@ public void deleteStaleTranslogMetadataFilesAsync(Runnable onCompletion) {
367372
transferService.listAllInSortedOrderAsync(
368373
ThreadPool.Names.REMOTE_PURGE,
369374
remoteMetadataTransferPath,
375+
TranslogTransferMetadata.METADATA_PREFIX,
370376
Integer.MAX_VALUE,
371377
new ActionListener<>() {
372378
@Override

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class TranslogTransferMetadata {
3636

3737
public static final String METADATA_SEPARATOR = "__";
3838

39+
public static final String METADATA_PREFIX = "metadata";
40+
3941
static final int BUFFER_SIZE = 4096;
4042

4143
static final int CURRENT_VERSION = 1;
@@ -83,6 +85,7 @@ public String getFileName() {
8385
return String.join(
8486
METADATA_SEPARATOR,
8587
Arrays.asList(
88+
METADATA_PREFIX,
8689
RemoteStoreUtils.invertLong(primaryTerm),
8790
RemoteStoreUtils.invertLong(generation),
8891
RemoteStoreUtils.invertLong(createdAt),

server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import static org.mockito.ArgumentMatchers.anyInt;
4545
import static org.mockito.ArgumentMatchers.anyMap;
4646
import static org.mockito.ArgumentMatchers.anySet;
47+
import static org.mockito.ArgumentMatchers.anyString;
4748
import static org.mockito.Mockito.any;
4849
import static org.mockito.Mockito.doAnswer;
4950
import static org.mockito.Mockito.doNothing;
@@ -205,11 +206,12 @@ public void testReadMetadataNoFile() throws IOException {
205206
null
206207
);
207208
doAnswer(invocation -> {
208-
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
209+
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
209210
List<BlobMetadata> bmList = new LinkedList<>();
210211
latchedActionListener.onResponse(bmList);
211212
return null;
212-
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
213+
}).when(transferService)
214+
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));
213215

214216
assertNull(translogTransferManager.readMetadata());
215217
}
@@ -225,12 +227,13 @@ public void testReadMetadataSingleFile() throws IOException {
225227
TranslogTransferMetadata tm = new TranslogTransferMetadata(1, 1, 1, 2);
226228
String mdFilename = tm.getFileName();
227229
doAnswer(invocation -> {
228-
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
230+
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
229231
List<BlobMetadata> bmList = new LinkedList<>();
230232
bmList.add(new PlainBlobMetadata(mdFilename, 1));
231233
latchedActionListener.onResponse(bmList);
232234
return null;
233-
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
235+
}).when(transferService)
236+
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));
234237

235238
TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata();
236239
when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenReturn(
@@ -252,12 +255,13 @@ public void testReadMetadataReadException() throws IOException {
252255
String mdFilename = tm.getFileName();
253256

254257
doAnswer(invocation -> {
255-
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
258+
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
256259
List<BlobMetadata> bmList = new LinkedList<>();
257260
bmList.add(new PlainBlobMetadata(mdFilename, 1));
258261
latchedActionListener.onResponse(bmList);
259262
return null;
260-
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
263+
}).when(transferService)
264+
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));
261265

262266
when(transferService.downloadBlob(any(BlobPath.class), eq(mdFilename))).thenThrow(new IOException("Something went wrong"));
263267

@@ -283,10 +287,11 @@ public void testReadMetadataListException() throws IOException {
283287
);
284288

285289
doAnswer(invocation -> {
286-
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(2);
290+
LatchedActionListener<List<BlobMetadata>> latchedActionListener = invocation.getArgument(3);
287291
latchedActionListener.onFailure(new IOException("Issue while listing"));
288292
return null;
289-
}).when(transferService).listAllInSortedOrder(any(BlobPath.class), anyInt(), any(ActionListener.class));
293+
}).when(transferService)
294+
.listAllInSortedOrder(any(BlobPath.class), eq(TranslogTransferMetadata.METADATA_PREFIX), anyInt(), any(ActionListener.class));
290295

291296
when(transferService.downloadBlob(any(BlobPath.class), any(String.class))).thenThrow(new IOException("Something went wrong"));
292297

@@ -416,20 +421,27 @@ public void testDeleteStaleTranslogMetadata() {
416421
String tm2 = new TranslogTransferMetadata(1, 2, 1, 2).getFileName();
417422
String tm3 = new TranslogTransferMetadata(2, 3, 1, 2).getFileName();
418423
doAnswer(invocation -> {
419-
ActionListener<List<BlobMetadata>> actionListener = invocation.getArgument(3);
424+
ActionListener<List<BlobMetadata>> actionListener = invocation.getArgument(4);
420425
List<BlobMetadata> bmList = new LinkedList<>();
421426
bmList.add(new PlainBlobMetadata(tm1, 1));
422427
bmList.add(new PlainBlobMetadata(tm2, 1));
423428
bmList.add(new PlainBlobMetadata(tm3, 1));
424429
actionListener.onResponse(bmList);
425430
return null;
426431
}).when(transferService)
427-
.listAllInSortedOrderAsync(eq(ThreadPool.Names.REMOTE_PURGE), any(BlobPath.class), anyInt(), any(ActionListener.class));
432+
.listAllInSortedOrderAsync(
433+
eq(ThreadPool.Names.REMOTE_PURGE),
434+
any(BlobPath.class),
435+
anyString(),
436+
anyInt(),
437+
any(ActionListener.class)
438+
);
428439
List<String> files = List.of(tm2, tm3);
429440
translogTransferManager.deleteStaleTranslogMetadataFilesAsync(() -> {
430441
verify(transferService).listAllInSortedOrderAsync(
431442
eq(ThreadPool.Names.REMOTE_PURGE),
432443
any(BlobPath.class),
444+
eq(TranslogTransferMetadata.METADATA_PREFIX),
433445
eq(Integer.MAX_VALUE),
434446
any()
435447
);

server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
package org.opensearch.repositories.blobstore;
3434

35-
import org.apache.lucene.tests.util.LuceneTestCase;
3635
import org.opensearch.Version;
3736
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
3837
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
@@ -91,7 +90,6 @@
9190
/**
9291
* Tests for the {@link BlobStoreRepository} and its subclasses.
9392
*/
94-
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
9593
public class BlobStoreRepositoryTests extends OpenSearchSingleNodeTestCase {
9694

9795
static final String REPO_TYPE = "fsLike";

0 commit comments

Comments
 (0)