Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public final class RepositoryData {
*/
public static final long UNKNOWN_REPO_GEN = -2L;

/**
* The generation value indicating that the repository generation could not be determined.
*/
public static final long CORRUPTED_REPO_GEN = -3L;

/**
* An instance initialized for an empty repository.
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -193,13 +194,16 @@ public void testSnapshotWithConflictingName() throws IOException {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
}
};
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();
return repository;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import java.nio.file.Files;
import java.nio.file.Path;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase {

public void testConcurrentlyChangeRepositoryContents() throws Exception {
Client client = client();

Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData =
getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName));
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));

assertRepositoryBlocked(client, repoName, snapshot);

if (randomBoolean()) {
logger.info("--> move index-N blob back to initial generation");
Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId()));

logger.info("--> verify repository remains blocked");
assertRepositoryBlocked(client, repoName, snapshot);
}

logger.info("--> remove repository");
assertAcked(client.admin().cluster().prepareDeleteRepository(repoName));

logger.info("--> recreate repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}

public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception {
Client client = client();

Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}

public void testFindDanglingLatestGeneration() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> set next generation as pending in the cluster state");
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
.putCustom(RepositoriesMetaData.TYPE,
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
}

@Override
public void onFailure(String source, Exception e) {
csUpdateFuture.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
csUpdateFuture.onResponse(null);
}
}
);
csUpdateFuture.get();

logger.info("--> full cluster restart");
internalCluster().fullRestart();
ensureGreen();

Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}

private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException3.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));

logger.info("--> try to create snapshot");
final RepositoryException repositoryException4 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException4.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public void testOverwriteSnapshotInfoBlob() {
try (BlobStoreRepository repository =
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
deleteAndAssertEmpty(getRepository().basePath());
client().admin().cluster().prepareDeleteRepository("test-repo").get();
super.tearDown();
}

Expand Down Expand Up @@ -168,8 +169,6 @@ protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, Blo
}

public void testCleanup() throws Exception {
createRepository("test-repo");

createIndex("test-idx-1");
createIndex("test-idx-2");
createIndex("test-idx-3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -29,6 +30,8 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -69,6 +72,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.test.ESTestCase.buildNewFakeTransportAddress;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -326,7 +330,11 @@ private static ClusterService mockClusterService(ClusterState initialState) {
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
// Setting local node as master so it may update the repository metadata in the cluster state
final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(
ClusterState.builder(initialState).nodes(
DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build());
when(clusterService.state()).then(invocationOnMock -> currentState.get());
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -353,8 +354,12 @@ private Environment createEnvironment() {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData));
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService);
clusterService.addStateApplier(e -> repository.updateState(e.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
return repository;
}

private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
Expand Down