Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@

package org.elasticsearch.snapshots;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
Expand All @@ -23,17 +27,30 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Predicates;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -819,6 +836,17 @@ public void testAllFeatures() {
}
});

if (randomBoolean()) {
// Sometimes also simulate bwc repository contents where some details are missing from the root blob
safeAwait(l -> {
try (var listeners = new RefCountingListener(l.map(v -> null))) {
for (final var repositoryName : randomSubsetOf(repositories)) {
removeDetailsForRandomSnapshots(repositoryName, listeners.acquire());
}
}
});
}

Predicate<SnapshotInfo> snapshotInfoPredicate = Predicates.always();

// {repository} path parameter
Expand Down Expand Up @@ -1000,4 +1028,102 @@ public void testAllFeatures() {

assertEquals(0, remaining);
}

/**
* Older versions of Elasticsearch don't record in {@link RepositoryData} all the details needed for the get-snapshots API to pick out
* the right snapshots, so in this case the API must fall back to reading those details from each candidate {@link SnapshotInfo} blob.
* Simulate this situation by manipulating the {@link RepositoryData} blob directly to remove all the optional details from some subset
* of its snapshots.
*/
private static void removeDetailsForRandomSnapshots(String repositoryName, ActionListener<Void> listener) {
final Set<SnapshotId> snapshotsWithoutDetails = ConcurrentCollections.newConcurrentSet();
final var masterRepositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final var repository = asInstanceOf(FsRepository.class, masterRepositoriesService.repository(repositoryName));
final var repositoryMetadata = repository.getMetadata();
final var repositorySettings = repositoryMetadata.settings();
final var repositoryDataBlobPath = asInstanceOf(FsBlobStore.class, repository.blobStore()).path()
.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryMetadata.generation());

SubscribableListener

// unregister the repository while we're mucking around with its internals
.<AcknowledgedResponse>newForked(
l -> client().execute(
TransportDeleteRepositoryAction.TYPE,
new DeleteRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repositoryName),
l
)
)
.andThenAccept(ElasticsearchAssertions::assertAcked)

// rewrite the RepositoryData blob with some details removed
.andThenAccept(ignored -> {
// load the existing RepositoryData JSON blob as raw maps/lists/etc.
final var repositoryDataBytes = Files.readAllBytes(repositoryDataBlobPath);
final var repositoryDataMap = XContentHelper.convertToMap(
JsonXContent.jsonXContent,
repositoryDataBytes,
0,
repositoryDataBytes.length,
true
);

// modify the contents
final var snapshotsList = asInstanceOf(List.class, repositoryDataMap.get("snapshots"));
for (final var snapshotObj : snapshotsList) {
if (randomBoolean()) {
continue;
}
final var snapshotMap = asInstanceOf(Map.class, snapshotObj);
snapshotsWithoutDetails.add(
new SnapshotId(
asInstanceOf(String.class, snapshotMap.get("name")),
asInstanceOf(String.class, snapshotMap.get("uuid"))
)
);

// remove the optional details fields
assertNotNull(snapshotMap.remove("start_time_millis"));
assertNotNull(snapshotMap.remove("end_time_millis"));
assertNotNull(snapshotMap.remove("slm_policy"));
}

// overwrite the RepositoryData JSON blob with its new contents
final var updatedRepositoryDataBytes = XContentTestUtils.convertToXContent(repositoryDataMap, XContentType.JSON);
try (var outputStream = Files.newOutputStream(repositoryDataBlobPath)) {
BytesRef bytesRef;
final var iterator = updatedRepositoryDataBytes.iterator();
while ((bytesRef = iterator.next()) != null) {
outputStream.write(bytesRef.bytes, bytesRef.offset, bytesRef.length);
}
}
})

// re-register the repository
.<AcknowledgedResponse>andThen(
l -> client().execute(
TransportPutRepositoryAction.TYPE,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repositoryName).type(FsRepository.TYPE)
.settings(repositorySettings),
l
)
)
.andThenAccept(ElasticsearchAssertions::assertAcked)

// verify that the details are indeed now missing
.<RepositoryData>andThen(
l -> masterRepositoriesService.repository(repositoryName).getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l)
)
.andThenAccept(repositoryData -> {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
assertEquals(
repositoryName + "/" + snapshotId.toString() + ": " + repositoryData.getSnapshotDetails(snapshotId),
snapshotsWithoutDetails.contains(snapshotId),
repositoryData.hasMissingDetails(snapshotId)
);
}
})

.addListener(listener);
}
}