Skip to content

Commit

Permalink
Optimize remote store GC flow with pinned timestamps (#15943)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
(cherry picked from commit dc4dbce)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 25, 2024
1 parent 7a95d56 commit 7dd7c38
Show file tree
Hide file tree
Showing 11 changed files with 854 additions and 227 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}

// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;
}
Expand Down Expand Up @@ -1009,7 +1009,8 @@ public static void remoteDirectoryCleanup(
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
boolean forceClean
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
Expand All @@ -1018,8 +1019,12 @@ public static void remoteDirectoryCleanup(
shardId,
pathStrategy
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
if (forceClean) {
remoteSegmentStoreDirectory.delete();

Check warning on line 1023 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L1023

Added line #L1023 was not covered by tests
} else {
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
}
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
Expand All @@ -1038,7 +1043,10 @@ private boolean deleteIfEmpty() throws IOException {
logger.info("Remote directory still has files, not deleting the path");
return false;
}
return delete();
}

private boolean delete() {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false);

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -105,6 +107,11 @@ protected void onDelete() {
}
}

@Override
protected void onMinRemoteGenReferencedChange() {
triggerTrimOnMinRemoteGenReferencedChange.set(true);
}

@Override
public void trimUnreferencedReaders() throws IOException {
trimUnreferencedReaders(false, true);
Expand Down Expand Up @@ -135,14 +142,22 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) {
return;
}

// This is to fail fast and avoid listing md files un-necessarily.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
return;
}

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) {
return;
} else if (triggerTrimOnMinRemoteGenReferencedChange.get()) {
triggerTrimOnMinRemoteGenReferencedChange.set(false);
}

// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
Expand All @@ -158,24 +173,20 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.size() <= 1) {
if (indexDeleted == false && metadataFiles.size() <= 1) {
logger.debug("No stale translog metadata files found");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}

// Check last fetch status of pinned timestamps. If stale, return.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");

Check warning on line 184 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L184

Added line #L184 was not covered by tests
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}

List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
logger
);
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted);

// If index is not deleted, make sure to keep latest metadata file
if (indexDeleted == false) {
Expand All @@ -194,10 +205,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted
indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeepInRemote()
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
Expand All @@ -208,7 +220,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
Expand All @@ -217,11 +233,10 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
remoteGenerationDeletionPermits.release();

Check warning on line 239 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L239

Added line #L239 was not covered by tests
}
} catch (Exception e) {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
Expand All @@ -237,18 +252,16 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

private long getMinGenerationToKeepInRemote() {
return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep();
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
boolean indexDeleted
long minGenerationToKeepInRemote
) throws IOException {
long maxGenerationToBeDeleted = Long.MAX_VALUE;

if (indexDeleted == false) {
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
}

Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
Expand All @@ -262,21 +275,31 @@ protected Set<Long> getGenerationsToBeDeleted(
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
// Check if the generation is not referred by metadata file matching pinned timestamps
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
// The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations
// that are not persisted in remote segment store yet.
if (generation < minGenerationToKeepInRemote && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
}
}
return generationsToBeDeleted;
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
return getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
getMinGenerationToKeepInRemote(),
indexDeleted,
logger
);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
long minGenerationToKeepInRemote,
boolean indexDeleted,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Expand Down Expand Up @@ -312,6 +335,22 @@ protected static List<String> getMetadataFilesToBeDeleted(
metadataFilesToBeDeleted.size()
);

if (indexDeleted == false) {
// Filter out metadata files based on minGenerationToKeep
List<String> metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> {
long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md);
return maxGeneration == -1 || maxGeneration >= minGenerationToKeepInRemote;
}).collect(Collectors.toList());
metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep);

logger.trace(
"metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}",
metadataFilesContainingMinGenerationToKeep.size(),
metadataFilesToBeDeleted.size(),
minGenerationToKeepInRemote
);
}

return metadataFilesToBeDeleted;
}

Expand Down Expand Up @@ -472,50 +511,60 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
public static void cleanupOfDeletedIndex(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException {
if (forceClean) {
translogTransferManager.delete();

Check warning on line 516 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L516

Added line #L516 was not covered by tests
} else {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {

Check warning on line 518 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L518

Added line #L518 was not covered by tests
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

Check warning on line 521 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L521

Added line #L521 was not covered by tests

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;

Check warning on line 526 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L525-L526

Added lines #L525 - L526 were not covered by tests
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(

Check warning on line 528 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L528

Added line #L528 was not covered by tests
metadataFiles,
new HashMap<>(),
Long.MAX_VALUE,
true, // This method gets called when the index is no longer present
staticLogger
);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;

Check warning on line 537 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L536-L537

Added lines #L536 - L537 were not covered by tests
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

Check warning on line 539 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L539

Added line #L539 was not covered by tests

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Check warning on line 544 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L542-L544

Added lines #L542 - L544 were not covered by tests

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

Check warning on line 547 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L547

Added line #L547 was not covered by tests

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
// Delete stale primary terms
deleteStaleRemotePrimaryTerms(

Check warning on line 550 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L550

Added line #L550 was not covered by tests
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

Check warning on line 560 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L557-L560

Added lines #L557 - L560 were not covered by tests

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);

Check warning on line 567 in server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java#L567

Added line #L567 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,17 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
minRemoteGenReferenced = getMinFileGeneration();
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
onMinRemoteGenReferencedChange();
}
logger.debug(
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}",
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}",
primaryTerm,
generation,
maxSeqNo
maxSeqNo,
minRemoteGenReferenced
);
}

Expand All @@ -702,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}

protected void onMinRemoteGenReferencedChange() {

}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
}
}

public static long getMaxGenerationFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[2]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e);
return -1;

Check warning on line 179 in server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java#L177-L179

Added lines #L177 - L179 were not covered by tests
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
Expand Down
Loading

0 comments on commit 7dd7c38

Please sign in to comment.