Skip to content

Commit

Permalink
Merge branch 'main' into alloc-routing
Browse files Browse the repository at this point in the history
  • Loading branch information
mch2 committed Sep 3, 2024
2 parents 5f02094 + 3fc0139 commit aac9334
Show file tree
Hide file tree
Showing 10 changed files with 619 additions and 327 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- [Reader Writer Separation] Add routing preference for search replicas ([#15563](https://github.com/opensearch-project/OpenSearch/pull/15563))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,35 @@ public Translog newTranslog(

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository);
return new RemoteFsTranslog(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings
);
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
return new RemoteFsTimestampAwareTranslog(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings
);
} else {
return new RemoteFsTranslog(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings
);
}
}

public Repository getRepository() {
Expand Down

Large diffs are not rendered by default.

309 changes: 37 additions & 272 deletions server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.Numbers;
import org.opensearch.common.Priority;
import org.opensearch.common.Randomness;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -831,7 +832,7 @@ boolean getPrefixModeVerification() {
* maintains single lazy instance of {@link BlobContainer}
*/
protected BlobContainer blobContainer() {
// assertSnapshotOrGenericThread();
assertSnapshotOrGenericThread();

BlobContainer blobContainer = this.blobContainer.get();
if (blobContainer == null) {
Expand Down Expand Up @@ -1204,6 +1205,10 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
ActionListener<Void> listener
) {
final List<Tuple<BlobPath, String>> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults);
long startTimeNs = System.nanoTime();
Randomness.shuffle(filesToDelete);
logger.debug("[{}] shuffled the filesToDelete with timeElapsedNs={}", metadata.name(), (System.nanoTime() - startTimeNs));

if (filesToDelete.isEmpty()) {
listener.onResponse(null);
return;
Expand All @@ -1221,8 +1226,8 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size());
// Start as many workers as fit into the snapshot_deletion pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, groupedListener);
}
Expand Down Expand Up @@ -1326,7 +1331,7 @@ private void executeStaleShardDelete(
if (filesToDelete == null) {
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
try {
// filtering files for which remote store lock release and cleanup succeeded,
// remaining files for which it failed will be retried in next snapshot delete run.
Expand Down Expand Up @@ -1390,7 +1395,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted
) {

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
final List<IndexId> indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);

if (indices.isEmpty()) {
Expand Down Expand Up @@ -1578,7 +1583,7 @@ private void cleanupStaleBlobs(
listener.onResponse(deleteResult);
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
Expand Down Expand Up @@ -1781,7 +1786,7 @@ void cleanupStaleIndices(

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(),
foundIndices.size() - survivingIndexIds.size()
);
for (int i = 0; i < workers; ++i) {
Expand Down Expand Up @@ -1833,7 +1838,7 @@ private void executeOneStaleIndexDelete(
return;
}
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.supply(listener, () -> {
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
List<String> matchingShardPaths = findMatchingShardPaths(indexSnId, snapshotShardPaths);
Expand Down Expand Up @@ -2097,8 +2102,7 @@ public void finalizeSnapshot(
stateTransformer,
repositoryUpdatePriority,
ActionListener.wrap(newRepoData -> {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
listener.onResponse(newRepoData);
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData, newRepoData, listener);
}, onUpdateFailure)
);
}, onUpdateFailure), 2 + indices.size());
Expand Down Expand Up @@ -2254,7 +2258,12 @@ private void logShardPathsOperationWarning(IndexId indexId, SnapshotId snapshotI
}

// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) {
private void cleanupOldShardGens(
RepositoryData existingRepositoryData,
RepositoryData updatedRepositoryData,
RepositoryData newRepositoryData,
ActionListener<RepositoryData> listener
) {
final List<String> toDelete = new ArrayList<>();
updatedRepositoryData.shardGenerations()
.obsoleteShardGenerations(existingRepositoryData.shardGenerations())
Expand All @@ -2263,10 +2272,62 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
(shardId, oldGen) -> toDelete.add(shardPath(indexId, shardId).buildAsString() + INDEX_FILE_PREFIX + oldGen)
)
);
if (toDelete.isEmpty()) {
listener.onResponse(newRepositoryData);
return;
}
try {
deleteFromContainer(rootBlobContainer(), toDelete);
AtomicInteger counter = new AtomicInteger();
Collection<List<String>> subList = toDelete.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
.values();
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);
logger.info(
"[{}] cleanupOldShardGens toDeleteSize={} groupSize={}",
metadata.name(),
toDelete.size(),
staleFilesToDeleteInBatch.size()
);
final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.wrap(r -> {
logger.info("[{}] completed cleanupOldShardGens", metadata.name());
listener.onResponse(newRepositoryData);
}, ex -> {
logger.error(new ParameterizedMessage("[{}] exception in cleanupOldShardGens", metadata.name()), ex);
listener.onResponse(newRepositoryData);
}), staleFilesToDeleteInBatch.size());

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeOldShardGensCleanup(staleFilesToDeleteInBatch, groupedListener);
}
} catch (Exception e) {
logger.warn("Failed to clean up old shard generation blobs", e);
logger.warn(new ParameterizedMessage(" [{}] Failed to clean up old shard generation blobs", metadata.name()), e);
listener.onResponse(newRepositoryData);
}
}

private void executeOldShardGensCleanup(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(rootBlobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"[{}] Failed to delete following blobs during cleanupOldFiles : {}",
metadata.name(),
filesToDelete
),
e
);
l.onFailure(e);
}
executeOldShardGensCleanup(staleFilesToDeleteInBatch, listener);
}));
}
}

Expand Down Expand Up @@ -2383,10 +2444,11 @@ public long getRemoteDownloadThrottleTimeInNanos() {
}

protected void assertSnapshotOrGenericThread() {
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_DELETION + ']')
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
|| Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread ["
+ Thread.currentThread()
+ "] to be the snapshot or generic thread.";
+ "] to be the snapshot_deletion or snapshot or generic thread.";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ public SnapshotInfo(final StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
remoteStoreIndexShallowCopy = in.readOptionalBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_17_0)) {
pinnedTimestamp = in.readVLong();
}
}
Expand Down Expand Up @@ -940,7 +940,7 @@ public void writeTo(final StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalBoolean(remoteStoreIndexShallowCopy);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeVLong(pinnedTimestamp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public static class Names {
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String SNAPSHOT_DELETION = "snapshot_deletion";
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
Expand Down Expand Up @@ -176,6 +177,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.REFRESH, ThreadPoolType.SCALING);
map.put(Names.WARMER, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT_DELETION, ThreadPoolType.SCALING);
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
Expand Down Expand Up @@ -234,6 +236,7 @@ public ThreadPool(
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors);
final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512);
final int snapshotDeletionPoolMax = boundedBy(4 * allocatedProcessors, 64, 256);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000));
Expand All @@ -251,6 +254,10 @@ public ThreadPool(
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(
Names.SNAPSHOT_DELETION,
new ScalingExecutorBuilder(Names.SNAPSHOT_DELETION, 1, snapshotDeletionPoolMax, TimeValue.timeValueMinutes(5))
);
builders.put(
Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))
Expand Down
Loading

0 comments on commit aac9334

Please sign in to comment.