Skip to content

Commit cbb7f29

Browse files
sachinpkaledk2k
authored andcommitted
Optimize remote store GC flow with pinned timestamps (opensearch-project#15943)
Signed-off-by: Sachin Kale <[email protected]>
1 parent 56042eb commit cbb7f29

File tree

11 files changed

+854
-227
lines changed

11 files changed

+854
-227
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java

+433
Large diffs are not rendered by default.

server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java

+110-121
Large diffs are not rendered by default.

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
854854
}
855855

856856
// Check last fetch status of pinned timestamps. If stale, return.
857-
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
857+
if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) {
858858
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
859859
return;
860860
}
@@ -994,7 +994,8 @@ public static void remoteDirectoryCleanup(
994994
String remoteStoreRepoForIndex,
995995
String indexUUID,
996996
ShardId shardId,
997-
RemoteStorePathStrategy pathStrategy
997+
RemoteStorePathStrategy pathStrategy,
998+
boolean forceClean
998999
) {
9991000
try {
10001001
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
@@ -1003,8 +1004,12 @@ public static void remoteDirectoryCleanup(
10031004
shardId,
10041005
pathStrategy
10051006
);
1006-
remoteSegmentStoreDirectory.deleteStaleSegments(0);
1007-
remoteSegmentStoreDirectory.deleteIfEmpty();
1007+
if (forceClean) {
1008+
remoteSegmentStoreDirectory.delete();
1009+
} else {
1010+
remoteSegmentStoreDirectory.deleteStaleSegments(0);
1011+
remoteSegmentStoreDirectory.deleteIfEmpty();
1012+
}
10081013
} catch (Exception e) {
10091014
staticLogger.error("Exception occurred while deleting directory", e);
10101015
}
@@ -1023,7 +1028,10 @@ private boolean deleteIfEmpty() throws IOException {
10231028
logger.info("Remote directory still has files, not deleting the path");
10241029
return false;
10251030
}
1031+
return delete();
1032+
}
10261033

1034+
private boolean delete() {
10271035
try {
10281036
remoteDataDirectory.delete();
10291037
remoteMetadataDirectory.delete();

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

+110-61
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Optional;
3535
import java.util.Set;
3636
import java.util.TreeSet;
37+
import java.util.concurrent.atomic.AtomicBoolean;
3738
import java.util.concurrent.atomic.AtomicLong;
3839
import java.util.function.BooleanSupplier;
3940
import java.util.function.LongConsumer;
@@ -61,6 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
6162
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
6263
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
6364
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
65+
private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false);
6466

6567
public RemoteFsTimestampAwareTranslog(
6668
TranslogConfig config,
@@ -105,6 +107,11 @@ protected void onDelete() {
105107
}
106108
}
107109

110+
@Override
111+
protected void onMinRemoteGenReferencedChange() {
112+
triggerTrimOnMinRemoteGenReferencedChange.set(true);
113+
}
114+
108115
@Override
109116
public void trimUnreferencedReaders() throws IOException {
110117
trimUnreferencedReaders(false, true);
@@ -135,14 +142,22 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)
135142

136143
// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
137144
// store.
138-
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
145+
if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) {
139146
return;
140147
}
141148

142149
// This is to fail fast and avoid listing md files un-necessarily.
143150
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
144-
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
151+
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
152+
return;
153+
}
154+
155+
// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
156+
// call in each invocation of trimUnreferencedReaders
157+
if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) {
145158
return;
159+
} else if (triggerTrimOnMinRemoteGenReferencedChange.get()) {
160+
triggerTrimOnMinRemoteGenReferencedChange.set(false);
146161
}
147162

148163
// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
@@ -158,24 +173,20 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
158173
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
159174

160175
try {
161-
if (metadataFiles.size() <= 1) {
176+
if (indexDeleted == false && metadataFiles.size() <= 1) {
162177
logger.debug("No stale translog metadata files found");
163178
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
164179
return;
165180
}
166181

167182
// Check last fetch status of pinned timestamps. If stale, return.
168183
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
169-
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
184+
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
170185
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
171186
return;
172187
}
173188

174-
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
175-
metadataFiles,
176-
metadataFilePinnedTimestampMap,
177-
logger
178-
);
189+
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted);
179190

180191
// If index is not deleted, make sure to keep latest metadata file
181192
if (indexDeleted == false) {
@@ -194,10 +205,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
194205
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
195206

196207
logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
208+
197209
Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
198210
metadataFilesNotToBeDeleted,
199211
metadataFilesToBeDeleted,
200-
indexDeleted
212+
indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeepInRemote()
201213
);
202214

203215
logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
@@ -208,7 +220,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
208220
generationsToBeDeleted,
209221
remoteGenerationDeletionPermits::release
210222
);
223+
} else {
224+
remoteGenerationDeletionPermits.release();
225+
}
211226

227+
if (metadataFilesToBeDeleted.isEmpty() == false) {
212228
// Delete stale metadata files
213229
translogTransferManager.deleteMetadataFilesAsync(
214230
metadataFilesToBeDeleted,
@@ -217,11 +233,10 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
217233

218234
// Update cache to keep only those metadata files that are not getting deleted
219235
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);
220-
221236
// Delete stale primary terms
222237
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
223238
} else {
224-
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
239+
remoteGenerationDeletionPermits.release();
225240
}
226241
} catch (Exception e) {
227242
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
@@ -237,18 +252,16 @@ public void onFailure(Exception e) {
237252
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
238253
}
239254

255+
private long getMinGenerationToKeepInRemote() {
256+
return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep();
257+
}
258+
240259
// Visible for testing
241260
protected Set<Long> getGenerationsToBeDeleted(
242261
List<String> metadataFilesNotToBeDeleted,
243262
List<String> metadataFilesToBeDeleted,
244-
boolean indexDeleted
263+
long minGenerationToKeepInRemote
245264
) throws IOException {
246-
long maxGenerationToBeDeleted = Long.MAX_VALUE;
247-
248-
if (indexDeleted == false) {
249-
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
250-
}
251-
252265
Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
253266
for (String mdFile : metadataFilesToBeDeleted) {
254267
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
@@ -262,21 +275,31 @@ protected Set<Long> getGenerationsToBeDeleted(
262275
Set<Long> generationsToBeDeleted = new HashSet<>();
263276
for (long generation : generationsFromMetadataFilesToBeDeleted) {
264277
// Check if the generation is not referred by metadata file matching pinned timestamps
265-
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
278+
// The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations
279+
// that are not persisted in remote segment store yet.
280+
if (generation < minGenerationToKeepInRemote && isGenerationPinned(generation, pinnedGenerations) == false) {
266281
generationsToBeDeleted.add(generation);
267282
}
268283
}
269284
return generationsToBeDeleted;
270285
}
271286

272-
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
273-
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
287+
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
288+
return getMetadataFilesToBeDeleted(
289+
metadataFiles,
290+
metadataFilePinnedTimestampMap,
291+
getMinGenerationToKeepInRemote(),
292+
indexDeleted,
293+
logger
294+
);
274295
}
275296

276297
// Visible for testing
277298
protected static List<String> getMetadataFilesToBeDeleted(
278299
List<String> metadataFiles,
279300
Map<Long, String> metadataFilePinnedTimestampMap,
301+
long minGenerationToKeepInRemote,
302+
boolean indexDeleted,
280303
Logger logger
281304
) {
282305
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
@@ -312,6 +335,22 @@ protected static List<String> getMetadataFilesToBeDeleted(
312335
metadataFilesToBeDeleted.size()
313336
);
314337

338+
if (indexDeleted == false) {
339+
// Filter out metadata files based on minGenerationToKeep
340+
List<String> metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> {
341+
long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md);
342+
return maxGeneration == -1 || maxGeneration >= minGenerationToKeepInRemote;
343+
}).collect(Collectors.toList());
344+
metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep);
345+
346+
logger.trace(
347+
"metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}",
348+
metadataFilesContainingMinGenerationToKeep.size(),
349+
metadataFilesToBeDeleted.size(),
350+
minGenerationToKeepInRemote
351+
);
352+
}
353+
315354
return metadataFilesToBeDeleted;
316355
}
317356

@@ -472,50 +511,60 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
472511
}
473512
}
474513

475-
public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
476-
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
477-
@Override
478-
public void onResponse(List<BlobMetadata> blobMetadata) {
479-
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
514+
public static void cleanupOfDeletedIndex(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException {
515+
if (forceClean) {
516+
translogTransferManager.delete();
517+
} else {
518+
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
519+
@Override
520+
public void onResponse(List<BlobMetadata> blobMetadata) {
521+
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
522+
523+
try {
524+
if (metadataFiles.isEmpty()) {
525+
staticLogger.debug("No stale translog metadata files found");
526+
return;
527+
}
528+
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
529+
metadataFiles,
530+
new HashMap<>(),
531+
Long.MAX_VALUE,
532+
true, // This method gets called when the index is no longer present
533+
staticLogger
534+
);
535+
if (metadataFilesToBeDeleted.isEmpty()) {
536+
staticLogger.debug("No metadata files to delete");
537+
return;
538+
}
539+
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
480540

481-
try {
482-
if (metadataFiles.isEmpty()) {
483-
staticLogger.debug("No stale translog metadata files found");
484-
return;
485-
}
486-
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
487-
if (metadataFilesToBeDeleted.isEmpty()) {
488-
staticLogger.debug("No metadata files to delete");
489-
return;
490-
}
491-
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
541+
// For all the files that we are keeping, fetch min and max generations
542+
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
543+
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
544+
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
492545

493-
// For all the files that we are keeping, fetch min and max generations
494-
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
495-
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
496-
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
546+
// Delete stale metadata files
547+
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
497548

498-
// Delete stale metadata files
499-
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
549+
// Delete stale primary terms
550+
deleteStaleRemotePrimaryTerms(
551+
metadataFilesNotToBeDeleted,
552+
translogTransferManager,
553+
new HashMap<>(),
554+
new AtomicLong(Long.MAX_VALUE),
555+
staticLogger
556+
);
557+
} catch (Exception e) {
558+
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
559+
}
560+
}
500561

501-
// Delete stale primary terms
502-
deleteStaleRemotePrimaryTerms(
503-
metadataFilesNotToBeDeleted,
504-
translogTransferManager,
505-
new HashMap<>(),
506-
new AtomicLong(Long.MAX_VALUE),
507-
staticLogger
508-
);
509-
} catch (Exception e) {
562+
@Override
563+
public void onFailure(Exception e) {
510564
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
511565
}
512-
}
513-
514-
@Override
515-
public void onFailure(Exception e) {
516-
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
517-
}
518-
};
519-
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
566+
};
567+
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
568+
}
520569
}
521570
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -683,12 +683,17 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
683683
@Override
684684
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
685685
maxRemoteTranslogGenerationUploaded = generation;
686+
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
686687
minRemoteGenReferenced = getMinFileGeneration();
688+
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
689+
onMinRemoteGenReferencedChange();
690+
}
687691
logger.debug(
688-
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}",
692+
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}",
689693
primaryTerm,
690694
generation,
691-
maxSeqNo
695+
maxSeqNo,
696+
minRemoteGenReferenced
692697
);
693698
}
694699

@@ -702,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
702707
}
703708
}
704709

710+
protected void onMinRemoteGenReferencedChange() {
711+
712+
}
713+
705714
@Override
706715
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
707716
return minSeqNoToKeep;

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java

+10
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,16 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
170170
}
171171
}
172172

173+
public static long getMaxGenerationFromFileName(String filename) {
174+
String[] tokens = filename.split(METADATA_SEPARATOR);
175+
try {
176+
return RemoteStoreUtils.invertLong(tokens[2]);
177+
} catch (Exception e) {
178+
logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e);
179+
return -1;
180+
}
181+
}
182+
173183
public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
174184
String[] tokens = filename.split(METADATA_SEPARATOR);
175185
if (tokens.length < 7) {

0 commit comments

Comments
 (0)