From fdf484c6625a3f4e80990337e2c693de38e00c26 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 15 Jul 2021 14:08:59 +0200 Subject: [PATCH 01/11] add snapshots to delete to repository metadata --- .../get/GetRepositoriesResponse.java | 7 +- .../metadata/RepositoriesMetadata.java | 37 +++++- .../cluster/metadata/RepositoryMetadata.java | 81 ++++++++++-- .../elasticsearch/snapshots/SnapshotId.java | 14 ++ .../snapshots/SnapshotsService.java | 2 + ...epositoriesMetadataSerializationTests.java | 125 +++++++++++------- .../snapshots/SnapshotIdTests.java | 57 ++++++++ 7 files changed, 265 insertions(+), 58 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java index 93ecc7e7f5da6..57640d9bf4032 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java @@ -55,7 +55,12 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - repositories.toXContent(builder, new DelegatingMapParams(Map.of(RepositoriesMetadata.HIDE_GENERATIONS_PARAM, "true"), params)); + repositories.toXContent(builder, + new DelegatingMapParams( + Map.of( + RepositoriesMetadata.HIDE_GENERATIONS_PARAM, "true", + RepositoriesMetadata.HIDE_SNAPSHOTS_TO_DELETE, "true" + ), params)); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java index 71b66ffd3c978..50534a0b9af92 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java @@ -22,12 +22,16 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.function.UnaryOperator; /** @@ -45,6 +49,12 @@ public class RepositoriesMetadata extends AbstractNamedDiffable implemen */ public static final String HIDE_GENERATIONS_PARAM = "hide_generations"; + /** + * Serialization parameter used to hide the {@link RepositoryMetadata#snapshotsToDelete()} + * in {@link org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse}. + */ + public static final String HIDE_SNAPSHOTS_TO_DELETE = "hide_snapshots_to_delete"; + private final List repositories; /** @@ -79,6 +89,14 @@ public RepositoriesMetadata withUuid(String repoName, String uuid) { return withUpdate(repoName, repositoryMetadata -> repositoryMetadata.withUuid(uuid)); } + public RepositoriesMetadata addSnapshotsToDelete(String repoName, Collection snapshotsToDelete) { + return withUpdate(repoName, repositoryMetadata -> repositoryMetadata.addSnapshotsToDelete(snapshotsToDelete)); + } + + public RepositoriesMetadata removeSnapshotsToDelete(String repoName, Collection snapshotsToDelete) { + return withUpdate(repoName, repositoryMetadata -> repositoryMetadata.removeSnapshotsToDelete(snapshotsToDelete)); + } + private RepositoriesMetadata withUpdate(String repoName, UnaryOperator update) { int indexOfRepo = -1; for (int i = 0; i < repositories.size(); i++) { @@ -200,6 +218,7 @@ public static RepositoriesMetadata fromXContent(XContentParser parser) throws IO Settings settings = Settings.EMPTY; long generation = RepositoryData.UNKNOWN_REPO_GEN; long pendingGeneration = RepositoryData.EMPTY_REPO_GEN; + final List snapshotsToDelete = new ArrayList<>(); while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String currentFieldName = parser.currentName(); @@ -228,6 +247,13 @@ public static RepositoriesMetadata fromXContent(XContentParser parser) throws IO throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name); } pendingGeneration = parser.longValue(); + } else if ("snapshots_to_delete".equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.START_ARRAY) { + throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name); + } + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + snapshotsToDelete.add(SnapshotId.parse(parser)); + } } else { throw new ElasticsearchParseException("failed to parse repository [{}], unknown field [{}]", name, currentFieldName); @@ -239,7 +265,9 @@ public static RepositoriesMetadata fromXContent(XContentParser parser) throws IO if (type == null) { throw new ElasticsearchParseException("failed to parse repository [{}], missing repository type", name); } - repository.add(new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration)); + repository.add( + new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, List.copyOf(snapshotsToDelete)) + ); } else { throw new ElasticsearchParseException("failed to parse repositories"); } @@ -284,6 +312,13 @@ public static void toXContent(RepositoryMetadata repository, XContentBuilder bui builder.field("generation", repository.generation()); builder.field("pending_generation", repository.pendingGeneration()); } + if (params.paramAsBoolean(HIDE_SNAPSHOTS_TO_DELETE, false) == false) { + builder.startArray("snapshots_to_delete"); + for (SnapshotId snapshotToDelete : repository.snapshotsToDelete()) { + builder.value(snapshotToDelete); + } + builder.endArray(); + } builder.endObject(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java index 6e0c2de3593ed..6fa87f9884805 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java @@ -12,9 +12,13 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotsService; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Objects; /** @@ -37,6 +41,11 @@ public class RepositoryMetadata implements Writeable { */ private final long pendingGeneration; + /** + * List of {@link org.elasticsearch.snapshots.SnapshotId} marked as "to delete" + */ + private final List snapshotsToDelete; + /** * Constructs new repository metadata * @@ -45,20 +54,29 @@ public class RepositoryMetadata implements Writeable { * @param settings repository settings */ public RepositoryMetadata(String name, String type, Settings settings) { - this(name, RepositoryData.MISSING_UUID, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN); + this(name, RepositoryData.MISSING_UUID, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN, List.of()); } public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) { - this(metadata.name, metadata.uuid, metadata.type, metadata.settings, generation, pendingGeneration); + this(metadata.name, metadata.uuid, metadata.type, metadata.settings, generation, pendingGeneration, List.of()); } - public RepositoryMetadata(String name, String uuid, String type, Settings settings, long generation, long pendingGeneration) { + public RepositoryMetadata( + String name, + String uuid, + String type, + Settings settings, + long generation, + long pendingGeneration, + List snapshotsToDelete + ) { this.name = name; this.uuid = uuid; this.type = type; this.settings = settings; this.generation = generation; this.pendingGeneration = pendingGeneration; + this.snapshotsToDelete = snapshotsToDelete; assert generation <= pendingGeneration : "Pending generation [" + pendingGeneration + "] must be greater or equal to generation [" + generation + "]"; } @@ -127,6 +145,20 @@ public long pendingGeneration() { return pendingGeneration; } + /** + * Returns the list of snapshots from this repository that have been marked as "to delete" after a searchable snapshot index with the + * delete_searchable_snapshot setting got deleted. These snapshots will be deleted as soon as possible. + * + * @return a {@link List} of {@link SnapshotId} to delete + */ + public List snapshotsToDelete() { + return snapshotsToDelete; + } + + public boolean hasSnapshotsToDelete() { + return snapshotsToDelete.isEmpty() == false; + } + public RepositoryMetadata(StreamInput in) throws IOException { name = in.readString(); if (in.getVersion().onOrAfter(SnapshotsService.UUIDS_IN_REPO_DATA_VERSION)) { @@ -138,6 +170,11 @@ public RepositoryMetadata(StreamInput in) throws IOException { settings = Settings.readSettingsFromStream(in); generation = in.readLong(); pendingGeneration = in.readLong(); + if (in.getVersion().onOrAfter(SnapshotsService.DELETE_SEARCHABLE_SNAPSHOT_ON_INDEX_DELETION_VERSION)) { + snapshotsToDelete = List.copyOf(in.readList(SnapshotId::new)); + } else { + snapshotsToDelete = List.of(); + } } /** @@ -155,6 +192,9 @@ public void writeTo(StreamOutput out) throws IOException { Settings.writeSettingsToStream(settings, out); out.writeLong(generation); out.writeLong(pendingGeneration); + if (out.getVersion().onOrAfter(SnapshotsService.DELETE_SEARCHABLE_SNAPSHOT_ON_INDEX_DELETION_VERSION)) { + out.writeList(snapshotsToDelete); + } } /** @@ -164,7 +204,11 @@ public void writeTo(StreamOutput out) throws IOException { * @return {@code true} if both instances equal in all fields but the generation fields */ public boolean equalsIgnoreGenerations(RepositoryMetadata other) { - return name.equals(other.name) && uuid.equals(other.uuid()) && type.equals(other.type()) && settings.equals(other.settings()); + return name.equals(other.name) + && uuid.equals(other.uuid()) + && type.equals(other.type()) + && settings.equals(other.settings()) + && Objects.equals(snapshotsToDelete, other.snapshotsToDelete); } @Override @@ -179,25 +223,44 @@ public boolean equals(Object o) { if (type.equals(that.type) == false) return false; if (generation != that.generation) return false; if (pendingGeneration != that.pendingGeneration) return false; - return settings.equals(that.settings); + if (settings.equals(that.settings) == false) return false; + return Objects.equals(snapshotsToDelete, that.snapshotsToDelete); } @Override public int hashCode() { - return Objects.hash(name, uuid, type, settings, generation, pendingGeneration); + return Objects.hash(name, uuid, type, settings, generation, pendingGeneration, snapshotsToDelete); } @Override public String toString() { return "RepositoryMetadata{" + name + "}{" + uuid + "}{" + type + "}{" + settings + "}{" - + generation + "}{" + pendingGeneration + "}"; + + generation + "}{" + pendingGeneration + "}{" + snapshotsToDelete + '}'; } public RepositoryMetadata withUuid(String uuid) { - return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration); + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, snapshotsToDelete); } public RepositoryMetadata withSettings(Settings settings) { - return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration); + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, snapshotsToDelete); + } + + public RepositoryMetadata addSnapshotsToDelete(Collection snapshotsToDelete) { + final List snapshots = new ArrayList<>(this.snapshotsToDelete); + for (SnapshotId snapshotToDelete : snapshotsToDelete) { + assert snapshots.contains(snapshotToDelete) == false : name + " found duplicate snapshot to delete: " + snapshotToDelete; + snapshots.add(snapshotToDelete); + } + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, List.copyOf(snapshots)); + } + + public RepositoryMetadata removeSnapshotsToDelete(Collection snapshotsToDelete) { + final List snapshots = new ArrayList<>(this.snapshotsToDelete); + if (snapshots.removeAll(snapshotsToDelete)) { + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, List.copyOf(snapshots)); + } else { + return this; + } } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java index c534b5970d239..683ff93605515 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java @@ -11,8 +11,11 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; import java.util.Objects; @@ -25,6 +28,13 @@ public final class SnapshotId implements Comparable, Writeable, ToXC private static final String NAME = "name"; private static final String UUID = "uuid"; + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, + args -> new SnapshotId((String) args[0], (String) args[1])); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(NAME)); + PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(UUID)); + } + private final String name; private final String uuid; @@ -117,4 +127,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + public static SnapshotId parse(XContentParser parser) { + return PARSER.apply(parser, null); + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 2a30b1845287a..997a2b94b4d6a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -130,6 +130,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0; + public static final Version DELETE_SEARCHABLE_SNAPSHOT_ON_INDEX_DELETION_VERSION = Version.V_8_0_0; + private static final Logger logger = LogManager.getLogger(SnapshotsService.class); public static final String UPDATE_SNAPSHOT_STATUS_ACTION_NAME = "internal:cluster/snapshot/update_snapshot_status"; diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java index f36aa52918924..b53b712873842 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetadataSerializationTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.Metadata.Custom; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; @@ -23,26 +24,22 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; public class RepositoriesMetadataSerializationTests extends AbstractDiffableSerializationTestCase { + /** + * Repository names are used as field names in the serialized XContent and this can fail parsing + * so we use a generator to have unique names. + */ + private static final AtomicLong generator = new AtomicLong(); + @Override protected Custom createTestInstance() { - int numberOfRepositories = randomInt(10); - List entries = new ArrayList<>(); + final int numberOfRepositories = randomIntBetween(1, 20); + final List entries = new ArrayList<>(); for (int i = 0; i < numberOfRepositories; i++) { - // divide by 2 to not overflow when adding to this number for the pending generation below - final long generation = randomNonNegativeLong() / 2L; - entries.add( - new RepositoryMetadata( - randomAlphaOfLength(10), - randomAlphaOfLength(10), - randomAlphaOfLength(10), - randomSettings(), - generation, - generation + randomLongBetween(0, generation) - ) - ); + entries.add(randomRepositoryMetadata()); } entries.sort(Comparator.comparing(RepositoryMetadata::name)); return new RepositoriesMetadata(entries); @@ -55,46 +52,21 @@ protected Writeable.Reader instanceReader() { @Override protected Custom mutateInstance(Custom instance) { - List entries = new ArrayList<>(((RepositoriesMetadata) instance).repositories()); - boolean addEntry = entries.isEmpty() ? true : randomBoolean(); - if (addEntry) { - entries.add(new RepositoryMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings())); - } else { + final List entries = new ArrayList<>(((RepositoriesMetadata) instance).repositories()); + if (entries.isEmpty() || randomBoolean()) { + entries.add(randomRepositoryMetadata()); + } else if (randomBoolean()) { entries.remove(randomIntBetween(0, entries.size() - 1)); - } - return new RepositoriesMetadata(entries); - } - - public Settings randomSettings() { - if (randomBoolean()) { - return Settings.EMPTY; } else { - int numberOfSettings = randomInt(10); - Settings.Builder builder = Settings.builder(); - for (int i = 0; i < numberOfSettings; i++) { - builder.put(randomAlphaOfLength(10), randomAlphaOfLength(20)); - } - return builder.build(); + int index = randomIntBetween(0, entries.size() - 1); + entries.add(index, mutateRepositoryMetadata(entries.get(index))); } + return new RepositoriesMetadata(entries); } @Override protected Custom makeTestChanges(Custom testInstance) { - RepositoriesMetadata repositoriesMetadata = (RepositoriesMetadata) testInstance; - List repos = new ArrayList<>(repositoriesMetadata.repositories()); - if (randomBoolean() && repos.size() > 1) { - // remove some elements - int leaveElements = randomIntBetween(0, repositoriesMetadata.repositories().size() - 1); - repos = randomSubsetOf(leaveElements, repos.toArray(new RepositoryMetadata[leaveElements])); - } - if (randomBoolean()) { - // add some elements - int addElements = randomInt(10); - for (int i = 0; i < addElements; i++) { - repos.add(new RepositoryMetadata(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings())); - } - } - return new RepositoriesMetadata(repos); + return mutateInstance(testInstance); } @Override @@ -117,4 +89,63 @@ protected Custom doParseInstance(XContentParser parser) throws IOException { return new RepositoriesMetadata(repos); } + private RepositoryMetadata mutateRepositoryMetadata(RepositoryMetadata instance) { + if (randomBoolean()) { + return instance.withUuid(randomValueOtherThan(instance.uuid(), UUIDs::randomBase64UUID)); + } else if (randomBoolean()) { + return instance.withSettings(randomValueOtherThan(instance.settings(), this::randomSettings)); + } else if (instance.hasSnapshotsToDelete() == false || randomBoolean()) { + final SnapshotId snapshotId = randomValueOtherThanMany(s -> instance.snapshotsToDelete().contains(s), this::randomSnapshotId); + return instance.addSnapshotsToDelete(List.of(snapshotId)); + } else { + return instance.removeSnapshotsToDelete( + randomSubsetOf(randomIntBetween(0, instance.snapshotsToDelete().size() - 1), instance.snapshotsToDelete()) + ); + } + } + + private RepositoryMetadata randomRepositoryMetadata() { + String name = String.valueOf(generator.getAndIncrement()) + '-' + randomAlphaOfLengthBetween(1, 10); + String type = randomAlphaOfLengthBetween(1, 10); + Settings settings = randomSettings(); + if (randomBoolean()) { + return new RepositoryMetadata(name, type, settings); + } + String uuid = UUIDs.randomBase64UUID(); + // divide by 2 to not overflow when adding to this number for the pending generation below + long generation = randomNonNegativeLong() / 2L; + long pendingGeneration = generation + randomLongBetween(0, generation); + List snapshotsToDelete = randomSnapshotsToDelete(); + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, snapshotsToDelete); + } + + private Settings randomSettings() { + if (randomBoolean()) { + return Settings.EMPTY; + } else { + int numberOfSettings = randomInt(10); + Settings.Builder builder = Settings.builder(); + for (int i = 0; i < numberOfSettings; i++) { + builder.put(randomAlphaOfLength(10), randomAlphaOfLength(20)); + } + return builder.build(); + } + } + + private List randomSnapshotsToDelete() { + if (randomBoolean()) { + return List.of(); + } else { + final int numberOfSnapshots = randomIntBetween(1, 10); + final List snapshotIds = new ArrayList<>(numberOfSnapshots); + for (int i = 0; i < numberOfSnapshots; i++) { + snapshotIds.add(randomSnapshotId()); + } + return List.copyOf(snapshotIds); + } + } + + private SnapshotId randomSnapshotId() { + return new SnapshotId(randomAlphaOfLengthBetween(1, 10), UUIDs.randomBase64UUID()); + } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java new file mode 100644 index 0000000000000..a89737a43c3bc --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; + +import java.io.IOException; + +public class SnapshotIdTests extends AbstractSerializingTestCase { + + @Override + protected SnapshotId doParseInstance(XContentParser parser) throws IOException { + return SnapshotId.parse(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return SnapshotId::new; + } + + @Override + protected SnapshotId createTestInstance() { + return new SnapshotId(randomSnapshotName(), randomSnapshotUuid()); + } + + @Override + protected SnapshotId mutateInstance(SnapshotId instance) throws IOException { + if (randomBoolean()) { + return new SnapshotId( + randomValueOtherThan(instance.getName(), SnapshotIdTests::randomSnapshotName), + instance.getUUID() + ); + } else { + return new SnapshotId( + instance.getName(), + randomValueOtherThan(instance.getUUID(), SnapshotIdTests::randomSnapshotUuid) + ); + } + } + + private static String randomSnapshotName() { + return randomAlphaOfLengthBetween(1, 10); + } + + private static String randomSnapshotUuid() { + return UUIDs.randomBase64UUID(); + } +} From ba6b73b19b5a34c094d766fb90e49d3637c828bc Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 16 Jul 2021 10:53:35 +0200 Subject: [PATCH 02/11] add snapshots to delete metadata --- .../get/GetRepositoriesResponse.java | 11 +- .../metadata/MetadataDeleteIndexService.java | 85 +++++++++++++ .../metadata/RepositoriesMetadata.java | 4 +- .../snapshots/RestoreService.java | 6 +- .../SearchableSnapshotsSettings.java | 5 + .../elasticsearch/snapshots/SnapshotId.java | 7 +- .../MetadataDeleteIndexServiceTests.java | 120 +++++++++++++++++- .../snapshots/SnapshotIdTests.java | 10 +- 8 files changed, 226 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java index 57640d9bf4032..57a2f04b2ad84 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java @@ -55,12 +55,13 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - repositories.toXContent(builder, + repositories.toXContent( + builder, new DelegatingMapParams( - Map.of( - RepositoriesMetadata.HIDE_GENERATIONS_PARAM, "true", - RepositoriesMetadata.HIDE_SNAPSHOTS_TO_DELETE, "true" - ), params)); + Map.of(RepositoriesMetadata.HIDE_GENERATIONS_PARAM, "true", RepositoriesMetadata.HIDE_SNAPSHOTS_TO_DELETE, "true"), + params + ) + ); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index 1fd597cc7f9dc..85f3296a63d94 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -21,21 +21,34 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; +import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; +import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotsService; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import static java.util.Collections.emptyList; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY; + /** * Deletes indices. */ @@ -120,6 +133,23 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) logger.trace("{} tombstones purged from the cluster state. Previous tombstone size: {}. Current tombstone size: {}.", graveyardBuilder.getNumPurged(), previousGraveyardSize, currentGraveyard.getTombstones().size()); + // add snapshot(s) marked as to delete to the cluster state + final Map> snapshotsToDelete = listOfSnapshotsToDelete(currentState, indicesToDelete); + if (snapshotsToDelete.isEmpty() == false) { + RepositoriesMetadata repositories = currentState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + boolean changed = false; + for (Map.Entry> snapshotToDelete : snapshotsToDelete.entrySet()) { + RepositoryMetadata repository = repositories.repository(snapshotToDelete.getKey()); + if (repository != null) { + repositories = repositories.addSnapshotsToDelete(repository.name(), snapshotToDelete.getValue()); + changed = true; + } + } + if (changed) { + metadataBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + } + } + Metadata newMetadata = metadataBuilder.build(); ClusterBlocks blocks = clusterBlocksBuilder.build(); @@ -142,4 +172,59 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) .build(), "deleted indices [" + indices + "]"); } + + private static Map> listOfSnapshotsToDelete(final ClusterState currentState, final Set indicesToDelete) { + final Map> snapshotsToDelete = new HashMap<>(); + + for (Index indexToDelete : indicesToDelete) { + final Settings indexSettings = currentState.metadata().getIndexSafe(indexToDelete).getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotIndexWithSnapshotDeletion(indexSettings) == false) { + continue; + } + + final String repositoryName = repositoryNameFromIndexSettings(currentState, indexSettings); + final String snapshotUuid = indexSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY); + + boolean canDeleteSnapshot = true; + for (IndexMetadata other : currentState.metadata()) { + if (indicesToDelete.contains(other.getIndex())) { + continue; // do not check indices that are going to be deleted + } + final Settings otherSettings = other.getSettings(); + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(otherSettings) == false) { + continue; // other index is not a searchable snapshot index, skip + } + final String otherSnapshotUuid = otherSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY); + if (Objects.equals(snapshotUuid, otherSnapshotUuid) == false) { + continue; // other index is backed by a different snapshot, skip + } + final String otherRepositoryName = repositoryNameFromIndexSettings(currentState, otherSettings); + if (Objects.equals(repositoryName, otherRepositoryName) == false) { + continue; // other index is backed by a snapshot from a different repository, skip + } + assert otherSettings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, false) : other; + canDeleteSnapshot = false; // another index is using the same snapshot, do not delete + break; + } + if (canDeleteSnapshot) { + snapshotsToDelete.computeIfAbsent(repositoryName, r -> new HashSet<>()) + .add(new SnapshotId(indexSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY), snapshotUuid)); + } + } + return snapshotsToDelete; + } + + private static String repositoryNameFromIndexSettings(ClusterState currentState, Settings indexSettings) { + final String repositoryUuid = indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY); + if (Strings.hasLength(repositoryUuid) == false) { + return indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); + } + final RepositoriesMetadata repoMetadata = currentState.metadata().custom(RepositoriesMetadata.TYPE); + final List repositories = repoMetadata == null ? emptyList() : repoMetadata.repositories(); + return repositories.stream() + .filter(r -> repositoryUuid.equals(r.uuid())) + .map(RepositoryMetadata::name) + .findFirst() + .orElseThrow(() -> new RepositoryMissingException(repositoryUuid)); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java index 50534a0b9af92..f77087a13cb7c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.Metadata.Custom; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -21,6 +20,7 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.snapshots.SnapshotId; @@ -29,9 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.function.UnaryOperator; /** diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 2140c32228888..3008a209299dc 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -1035,7 +1035,7 @@ private static IndexMetadata updateIndexSettings( "cannot disable setting [" + IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey() + "] on restore" ); } - if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(settings))) { + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(settings)) { final Boolean changed = changeSettings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, null); if (changed != null) { final Boolean previous = settings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, null); @@ -1266,7 +1266,7 @@ && isSystemIndex(snapshotIndexMetadata) == false) { ); } - if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(updatedIndexMetadata.getSettings()))) { + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(updatedIndexMetadata.getSettings())) { searchableSnapshotsIndices.add(updatedIndexMetadata.getIndex()); } } @@ -1566,7 +1566,7 @@ private static void ensureSearchableSnapshotsRestorable( continue; // do not check the searchable snapshot index against itself } final Settings otherSettings = other.getSettings(); - if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(otherSettings)) == false) { + if (SearchableSnapshotsSettings.isSearchableSnapshotStore(otherSettings) == false) { continue; // other index is not a searchable snapshot index, skip } final String otherSnapshotUuid = otherSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SearchableSnapshotsSettings.java b/server/src/main/java/org/elasticsearch/snapshots/SearchableSnapshotsSettings.java index 46f8ac4816341..6bbd32946bb89 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SearchableSnapshotsSettings.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SearchableSnapshotsSettings.java @@ -31,4 +31,9 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) { public static boolean isPartialSearchableSnapshotIndex(Settings indexSettings) { return isSearchableSnapshotStore(indexSettings) && indexSettings.getAsBoolean(SEARCHABLE_SNAPSHOT_PARTIAL_SETTING_KEY, false); } + + public static boolean isSearchableSnapshotIndexWithSnapshotDeletion(Settings indexSettings) { + return isSearchableSnapshotStore(indexSettings) + && indexSettings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, false); + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java index 683ff93605515..4aa0ec92e8dc7 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotId.java @@ -28,8 +28,11 @@ public final class SnapshotId implements Comparable, Writeable, ToXC private static final String NAME = "name"; private static final String UUID = "uuid"; - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - args -> new SnapshotId((String) args[0], (String) args[1])); + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, + true, + args -> new SnapshotId((String) args[0], (String) args[1]) + ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(NAME)); PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField(UUID)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index d241862569e9b..3cba76591b84e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -13,12 +13,15 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInProgressException; @@ -38,8 +41,16 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY; +import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -159,6 +170,113 @@ public void testDeleteCurrentWriteIndexForDataStream() { dataStreamName + "] and cannot be deleted")); } + public void testDeleteIndexWithSnapshotDeletion() { + final boolean deleteSnapshot = randomBoolean(); + final IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings(Settings.builder() + .put("index.version.created", VersionUtils.randomVersion(random())) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE) + .put(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY, "repo_name") + .put(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY, randomBoolean() ? null : "repo_uuid") + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY, "snap_name") + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY, "snap_uuid") + .put(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, deleteSnapshot) + .build()) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + final ClusterState initialState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(Metadata.builder() + .put(indexMetadata, false) + .putCustom(RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of(new RepositoryMetadata("repo_name", "fs", Settings.EMPTY).withUuid("repo_uuid"))))) + .routingTable(RoutingTable.builder().addAsNew(indexMetadata).build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetadata)) + .build(); + + final ClusterState updatedState = service.deleteIndices(initialState, Set.of(indexMetadata.getIndex())); + assertThat(updatedState.metadata().getIndices().get("test"), nullValue()); + assertThat(updatedState.blocks().indices().get("test"), nullValue()); + assertThat(updatedState.routingTable().index("test"), nullValue()); + + final RepositoriesMetadata updatedRepos = updatedState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + assertThat(updatedRepos.repository("repo_name"), notNullValue()); + if (deleteSnapshot) { + assertThat(updatedRepos.repository("repo_name").hasSnapshotsToDelete(), equalTo(true)); + assertThat(updatedRepos.repository("repo_name").snapshotsToDelete(), hasSize(1)); + assertThat(updatedRepos.repository("repo_name").snapshotsToDelete(), hasItem(new SnapshotId("snap_name", "snap_uuid"))); + } else { + assertThat(updatedRepos.repository("repo_name").hasSnapshotsToDelete(), equalTo(false)); + assertThat(updatedRepos.repository("repo_name").snapshotsToDelete(), hasSize(0)); + } + } + + public void testDeleteMultipleIndicesWithSnapshotDeletion() { + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), "fs", Settings.EMPTY); + if (randomBoolean()) { + repositoryMetadata = repositoryMetadata.withUuid(UUIDs.randomBase64UUID()); + } + + final Metadata.Builder metadataBuilder = Metadata.builder(); + metadataBuilder.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(List.of(repositoryMetadata))); + final RoutingTable.Builder routingBuilder = RoutingTable.builder(); + + final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()); + final int nbIndices = randomIntBetween(2, 10); + final Set indices = new HashSet<>(nbIndices); + + for (int i = 0; i < nbIndices; i++) { + Settings.Builder indexSettingsBuilder = Settings.builder() + .put("index.version.created", VersionUtils.randomVersion(random())) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_STORE_TYPE) + .put(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, true) + .put(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY, repositoryMetadata.name()) + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY, snapshotId.getName()) + .put(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY, snapshotId.getUUID()); + if (randomBoolean()) { + indexSettingsBuilder.put(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY, repositoryMetadata.uuid()); + } + IndexMetadata indexMetadata = IndexMetadata.builder(randomAlphaOfLength(10) + i) + .settings(indexSettingsBuilder.build()) + .numberOfShards(randomIntBetween(1, 3)) + .numberOfReplicas(randomInt(1)) + .build(); + metadataBuilder.put(indexMetadata, false); + routingBuilder.addAsNew(indexMetadata); + indices.add(indexMetadata.getIndex()); + } + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .routingTable(routingBuilder.build()) + .metadata(metadataBuilder) + .build(); + + while (indices.size() > 0) { + assertThat(repositoryMetadata, notNullValue()); + assertThat(repositoryMetadata.hasSnapshotsToDelete(), equalTo(false)); + assertThat(repositoryMetadata.snapshotsToDelete(), hasSize(0)); + + List indicesToDelete = randomSubsetOf(randomIntBetween(1, Math.max(1, indices.size() - 1)), indices); + clusterState = service.deleteIndices(clusterState, Set.copyOf(indicesToDelete)); + indicesToDelete.forEach(indices::remove); + + for (Index deletedIndex : indicesToDelete) { + assertThat(clusterState.metadata().index(deletedIndex), nullValue()); + assertThat(clusterState.routingTable().index(deletedIndex), nullValue()); + } + + repositoryMetadata = clusterState.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repository(repositoryMetadata.name()); + } + + assertThat(repositoryMetadata, notNullValue()); + assertThat(repositoryMetadata.hasSnapshotsToDelete(), equalTo(true)); + assertThat(repositoryMetadata.snapshotsToDelete(), hasSize(1)); + assertThat(repositoryMetadata.snapshotsToDelete(), hasItem(snapshotId)); + } + private ClusterState clusterState(String index) { IndexMetadata indexMetadata = IndexMetadata.builder(index) .settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random()))) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java index a89737a43c3bc..b41a3942202f8 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotIdTests.java @@ -35,15 +35,9 @@ protected SnapshotId createTestInstance() { @Override protected SnapshotId mutateInstance(SnapshotId instance) throws IOException { if (randomBoolean()) { - return new SnapshotId( - randomValueOtherThan(instance.getName(), SnapshotIdTests::randomSnapshotName), - instance.getUUID() - ); + return new SnapshotId(randomValueOtherThan(instance.getName(), SnapshotIdTests::randomSnapshotName), instance.getUUID()); } else { - return new SnapshotId( - instance.getName(), - randomValueOtherThan(instance.getUUID(), SnapshotIdTests::randomSnapshotUuid) - ); + return new SnapshotId(instance.getName(), randomValueOtherThan(instance.getUUID(), SnapshotIdTests::randomSnapshotUuid)); } } From a4466833fc628c00bca315ef66335077175ab2ce Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 16 Jul 2021 11:52:57 +0200 Subject: [PATCH 03/11] prevent snapshots to delete to be mounted/restored/cloned --- .../snapshots/RestoreService.java | 10 ++ .../snapshots/SnapshotsService.java | 11 ++ ...archableSnapshotsRepositoryIntegTests.java | 111 ++++++++++++++++-- 3 files changed, 124 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 3008a209299dc..3479e73570029 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -1354,6 +1355,15 @@ private void ensureSnapshotNotDeleted(ClusterState currentState) { "cannot restore a snapshot while a snapshot deletion is in-progress [" + deletionsInProgress.getEntries().get(0) + "]" ); } + final RepositoryMetadata repositoryMetadata = currentState.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repository(snapshot.getRepository()); + if (repositoryMetadata != null && repositoryMetadata.snapshotsToDelete().contains(snapshot.getSnapshotId())) { + throw new ConcurrentSnapshotExecutionException( + snapshot, + "cannot restore a snapshot already marked as deleted [" + snapshot.getSnapshotId() + "]" + ); + } } private void applyGlobalStateRestore(ClusterState currentState, Metadata.Builder mdBuilder) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 997a2b94b4d6a..be59e6f004136 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -49,6 +49,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -485,6 +486,16 @@ public ClusterState execute(ClusterState currentState) { "cannot clone from snapshot that is being deleted" ); } + final RepositoryMetadata repositoryMetadata = currentState.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repository(repositoryName); + if (repositoryMetadata != null && repositoryMetadata.snapshotsToDelete().contains(sourceSnapshotId)) { + throw new ConcurrentSnapshotExecutionException( + repositoryName, + sourceSnapshotId.getName(), + "cannot clone a snapshot that is marked as deleted" + ); + } ensureBelowConcurrencyLimit(repositoryName, snapshotName, snapshots, deletionsInProgress); final List indicesForSnapshot = new ArrayList<>(); for (IndexId indexId : repositoryData.getIndices().values()) { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java index 1cdc6d228ac57..129a2c02ada8f 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java @@ -15,6 +15,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException; +import org.elasticsearch.snapshots.SnapshotException; +import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotRestoreException; import java.util.Arrays; @@ -30,9 +33,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest.Storage; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; public class SearchableSnapshotsRepositoryIntegTests extends BaseFrozenSearchableSnapshotsIntegTestCase { @@ -270,14 +275,13 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { assertAcked(client().admin().indices().prepareDelete(indexName)); final String mountedIndex = "mounted-index"; - final boolean deleteSnapshot = randomBoolean(); - final Settings indexSettings = deleteSnapshotIndexSettingsOrNull(deleteSnapshot); + final Settings indexSettings = deleteSnapshotIndexSettingsOrNull(false); logger.info("--> mounting snapshot of index [{}] as [{}] with index settings [{}]", indexName, mountedIndex, indexSettings); mountSnapshot(repository, snapshotOfIndex, indexName, mountedIndex, indexSettings, randomFrom(Storage.values())); assertThat( getDeleteSnapshotIndexSetting(mountedIndex), indexSettings.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) - ? equalTo(Boolean.toString(deleteSnapshot)) + ? equalTo("false") : nullValue() ); @@ -286,14 +290,13 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { assertAcked(client().admin().indices().prepareDelete(mountedIndex)); final String mountedIndexAgain = "mounted-index-again"; - final boolean deleteSnapshotAgain = deleteSnapshot == false; - final Settings indexSettingsAgain = deleteSnapshotIndexSettings(deleteSnapshotAgain); + final Settings indexSettingsAgain = deleteSnapshotIndexSettings(true); logger.info("--> mounting snapshot of index [{}] again as [{}] with index settings [{}]", indexName, mountedIndex, indexSettings); mountSnapshot(repository, snapshotOfIndex, indexName, mountedIndexAgain, indexSettingsAgain, randomFrom(Storage.values())); assertThat( getDeleteSnapshotIndexSetting(mountedIndexAgain), indexSettingsAgain.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) - ? equalTo(Boolean.toString(deleteSnapshotAgain)) + ? equalTo("true") : nullValue() ); @@ -312,9 +315,9 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { allOf( containsString("cannot mount snapshot [" + repository + '/'), containsString(':' + snapshotOfMountedIndex + "] as index [" + mountedIndex + "] with "), - containsString("[index.store.snapshot.delete_searchable_snapshot: " + deleteSnapshot + "]; another "), + containsString("[index.store.snapshot.delete_searchable_snapshot: false]; another "), containsString("index [" + mountedIndexAgain + '/'), - containsString("is mounted with [index.store.snapshot.delete_searchable_snapshot: " + deleteSnapshotAgain + "].") + containsString("is mounted with [index.store.snapshot.delete_searchable_snapshot: true].") ) ); assertAcked(client().admin().indices().prepareDelete("mounted-*")); @@ -401,6 +404,98 @@ public void testRestoreSearchableSnapshotIndexWithDifferentSettingsConflicts() t assertAcked(client().admin().indices().prepareDelete("restored-with-same-setting-*")); } + public void testSnapshotMarkedAsToDeleteCannotBeMounted() throws Exception { + final String suffix = getTestName().toLowerCase(Locale.ROOT); + final String repository = "repository-" + suffix; + createRepository(repository, FsRepository.TYPE, randomRepositorySettings()); + + final String index = "index-" + suffix; + createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + + final String snapshot = "snapshot-" + suffix; + createSnapshot(repository, snapshot, List.of(index)); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = mountSnapshot(repository, snapshot, index, deleteSnapshotIndexSettings(true)); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + assertAcked(client().admin().indices().prepareDelete(mounted)); + + final SnapshotException exception = expectThrows(SnapshotException.class, + () -> mountSnapshot(repository, snapshot, index, deleteSnapshotIndexSettings(true))); + assertThat(exception, anyOf(instanceOf(SnapshotMissingException.class), instanceOf(ConcurrentSnapshotExecutionException.class))); + assertThat(exception.getMessage(), + anyOf( + containsString("cannot restore a snapshot while a snapshot deletion is in-progress"), + containsString("cannot restore a snapshot already marked as deleted") + ) + ); + } + + public void testSnapshotMarkedAsToDeleteCannotBeCloned() throws Exception { + final String suffix = getTestName().toLowerCase(Locale.ROOT); + final String repository = "repository-" + suffix; + createRepository(repository, FsRepository.TYPE, randomRepositorySettings()); + + final String index = "index-" + suffix; + createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + + final String snapshot = "snapshot-" + suffix; + createSnapshot(repository, snapshot, List.of(index)); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = mountSnapshot(repository, snapshot, index, deleteSnapshotIndexSettings(true)); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + assertAcked(client().admin().indices().prepareDelete(mounted)); + + final SnapshotException exception = expectThrows(SnapshotException.class, + () -> client().admin().cluster().prepareCloneSnapshot(repository, snapshot, "clone-" + snapshot).setIndices(index).get()); + assertThat(exception, anyOf(instanceOf(SnapshotMissingException.class), instanceOf(ConcurrentSnapshotExecutionException.class))); + assertThat(exception.getMessage(), + anyOf( + containsString("cannot clone a snapshot that is marked as deleted"), + containsString("cannot clone from snapshot that is being deleted") + ) + ); + } + + public void testSnapshotMarkedAsToDeleteCannotBeRestored() throws Exception { + final String suffix = getTestName().toLowerCase(Locale.ROOT); + final String repository = "repository-" + suffix; + createRepository(repository, FsRepository.TYPE, randomRepositorySettings()); + + final String index = "index-" + suffix; + createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + + final String snapshot = "snapshot-" + suffix; + createSnapshot(repository, snapshot, List.of(index)); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = mountSnapshot(repository, snapshot, index, deleteSnapshotIndexSettings(true)); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + assertAcked(client().admin().indices().prepareDelete(mounted)); + + final SnapshotException exception = expectThrows(SnapshotException.class, + () -> client().admin() + .cluster() + .prepareRestoreSnapshot(repository, snapshot) + .setIndices(index) + .setWaitForCompletion(true) + .get()); + assertThat(exception, anyOf(instanceOf(SnapshotMissingException.class), instanceOf(ConcurrentSnapshotExecutionException.class))); + assertThat(exception.getMessage(), + anyOf( + containsString("cannot restore a snapshot while a snapshot deletion is in-progress"), + containsString("cannot restore a snapshot already marked as deleted") + ) + ); + } + private static Settings deleteSnapshotIndexSettings(boolean value) { return Settings.builder().put(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, value).build(); } From 704c8ea27b1d83fcf58035e7eaa93691873266c8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 19 Jul 2021 11:38:32 +0200 Subject: [PATCH 04/11] trigger snapshot deletions --- .../metadata/MetadataDeleteIndexService.java | 9 + .../cluster/metadata/RepositoryMetadata.java | 2 +- .../snapshots/SnapshotsService.java | 243 +++++++++++++- .../BaseSearchableSnapshotsIntegTestCase.java | 3 +- ...archableSnapshotsRepositoryIntegTests.java | 297 ++++++++++++++++-- 5 files changed, 528 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index 85f3296a63d94..d84c99e31f515 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -183,9 +184,17 @@ private static Map> listOfSnapshotsToDelete(final Cluste } final String repositoryName = repositoryNameFromIndexSettings(currentState, indexSettings); + final String snapshotName = indexSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_NAME_SETTING_KEY); final String snapshotUuid = indexSettings.get(SEARCHABLE_SNAPSHOTS_SNAPSHOT_UUID_SETTING_KEY); boolean canDeleteSnapshot = true; + + // TODO change this to an assertion once it becomes impossible to delete a snapshot that is mounted as an index + if (currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY) + .getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(new SnapshotId(snapshotName, snapshotUuid)))) { + continue; // this snapshot is part of an existing snapshot deletion in progress, nothing to do + } + for (IndexMetadata other : currentState.metadata()) { if (indicesToDelete.contains(other.getIndex())) { continue; // do not check indices that are going to be deleted diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java index 6fa87f9884805..44d779017d544 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java @@ -58,7 +58,7 @@ public RepositoryMetadata(String name, String type, Settings settings) { } public RepositoryMetadata(RepositoryMetadata metadata, long generation, long pendingGeneration) { - this(metadata.name, metadata.uuid, metadata.type, metadata.settings, generation, pendingGeneration, List.of()); + this(metadata.name, metadata.uuid, metadata.type, metadata.settings, generation, pendingGeneration, metadata.snapshotsToDelete); } public RepositoryMetadata( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index be59e6f004136..5000ad34e2206 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -67,7 +68,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -95,12 +99,14 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -986,6 +992,7 @@ public void applyClusterState(ClusterChangedEvent event) { newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event) ); + startDeletionOfSnapshotsToDelete(event.state()); } else if (snapshotCompletionListeners.isEmpty() == false) { // We have snapshot listeners but are not the master any more. Fail all waiting listeners except for those that already // have their snapshots finalizing (those that are already finalizing will fail on their own from to update the cluster @@ -1002,6 +1009,7 @@ public void applyClusterState(ClusterChangedEvent event) { } assert assertConsistentWithClusterState(event.state()); assert assertNoDanglingSnapshots(event.state()); + assert assertSnapshotsToDeleteConsistency(event.state()); } private boolean assertConsistentWithClusterState(ClusterState state) { @@ -1225,6 +1233,223 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS ); } + /** + * Asserts that a snapshot to delete cannot be included both in {@link RepositoryMetadata} and in {@link SnapshotDeletionsInProgress}. + * + * @param state the current {@link ClusterState} + * @return true if all checks succeed + */ + private static boolean assertSnapshotsToDeleteConsistency(ClusterState state) { + final Map> snapshotsToDelete = new HashMap<>(); + for (RepositoryMetadata repository : state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repositories()) { + snapshotsToDelete.put(repository.name(), Set.copyOf(repository.snapshotsToDelete())); + } + final Map> onGoingSnapshotDeletions = new HashMap<>(); + for (SnapshotDeletionsInProgress.Entry deletionInProgress : state.custom( + SnapshotDeletionsInProgress.TYPE, + SnapshotDeletionsInProgress.EMPTY + ).getEntries()) { + onGoingSnapshotDeletions.put(deletionInProgress.repository(), Set.copyOf(deletionInProgress.getSnapshots())); + } + for (String repository : Sets.union(snapshotsToDelete.keySet(), onGoingSnapshotDeletions.keySet())) { + Set set1 = snapshotsToDelete.getOrDefault(repository, Set.of()); + Set set2 = onGoingSnapshotDeletions.getOrDefault(repository, Set.of()); + assert Sets.intersection(set1, set2).isEmpty() + : "repository [" + + repository + + "] has snapshots deletions still marked as to delete [SnapshotDeletionsInProgress={" + + onGoingSnapshotDeletions.get(repository) + + "}, RepositoryMetadata={" + + snapshotsToDelete.get(repository) + + "}]"; + } + return true; + } + + /** + * Mutex used to modify snapshots to delete related objects. + */ + private final Object snapshotToDeleteMutex = new Object(); + + /** + * Set of snapshots to delete whose deletion will be triggered, per repository + */ + private final Map> onGoingSnapshotsDeletions = new HashMap<>(); + + /** + * Queue of snapshots whose deletion must be triggered. Each repository has a dedicated queue. + */ + private final Map> queues = new HashMap<>(); + + /** + * Number of on-going snapshots to delete per repository + */ + private final Map numberOfOnGoingSnapshotsToDeletes = new HashMap<>(); + + /** + * Maximum number of on-going snapshots to delete per repository + */ + private final int maxConcurrentSnapshotsToDeletes = 5; + + private final TimeValue failedSnapshotsToDeleteRetryInterval = TimeValue.timeValueSeconds(30L); + + /** + * Finds snapshots to delete in the the cluster state repositories metadata and triggers explicit snapshot delete requests. + * + * @param state the current {@link ClusterState} + */ + private void startDeletionOfSnapshotsToDelete(final ClusterState state) { + final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + final boolean isMaster = state.nodes().isLocalNodeElectedMaster(); + + synchronized (snapshotToDeleteMutex) { + if (isMaster) { + for (RepositoryMetadata repository : repositories.repositories()) { + int newSnapshotsToDelete = 0; + for (SnapshotId snapshotId : repository.snapshotsToDelete()) { + if (onGoingSnapshotsDeletions.computeIfAbsent(repository.name(), r -> new LinkedHashSet<>()).add(snapshotId)) { + queues.computeIfAbsent(repository.name(), q -> new LinkedList<>()).add(snapshotId); + newSnapshotsToDelete += 1; + } + } + + for (int i = 0; i < Math.min(newSnapshotsToDelete, maxConcurrentSnapshotsToDeletes); i++) { + processSnapshotsToDelete(repository.name()); + } + } + + } else { + for (Map.Entry> queue : queues.entrySet()) { + final Set snapshotsToDelete = onGoingSnapshotsDeletions.get(queue.getKey()); + assert snapshotsToDelete != null : queue.getKey() + " has no snapshots to delete"; + + SnapshotId snapshotId = null; + while ((snapshotId = queue.getValue().poll()) != null) { + final boolean removed = snapshotsToDelete.remove(snapshotId); + assert removed : "snapshot to delete not found: " + snapshotId; + } + } + } + } + } + + private void processSnapshotsToDelete(final String repository) { + synchronized (snapshotToDeleteMutex) { + final int activeDeletes = numberOfOnGoingSnapshotsToDeletes.getOrDefault(repository, 0); + if (activeDeletes < maxConcurrentSnapshotsToDeletes) { + final Queue queue = queues.get(repository); + if (queue == null) { + assert onGoingSnapshotsDeletions.containsKey(repository) == false : repository; + assert numberOfOnGoingSnapshotsToDeletes.containsKey(repository) == false : repository; + return; + } + + final SnapshotId snapshotId = queue.poll(); + if (snapshotId != null) { + assert onGoingSnapshotsDeletions.containsKey(repository) : repository; + assert onGoingSnapshotsDeletions.get(repository).contains(snapshotId) : snapshotId; + + numberOfOnGoingSnapshotsToDeletes.put(repository, activeDeletes + 1); + threadPool.generic().execute(new SnapshotToDeleteRunnable(repository, snapshotId)); + } + } + } + } + + private void removeSnapshotsToDelete(final String repository, final SnapshotId snapshotId) { + synchronized (snapshotToDeleteMutex) { + final Set snapshotsToDelete = onGoingSnapshotsDeletions.get(repository); + assert snapshotsToDelete != null : repository + " has no snapshots to delete"; + + final boolean removed = snapshotsToDelete.remove(snapshotId); + assert removed : "snapshot to delete not found: " + snapshotId; + + final int activeDeletes = numberOfOnGoingSnapshotsToDeletes.getOrDefault(repository, -1); + assert activeDeletes > 0 : repository + " has no active deletes: " + activeDeletes; + + if (activeDeletes > 1) { + numberOfOnGoingSnapshotsToDeletes.put(repository, activeDeletes - 1); + } else { + final Integer removedActiveDeletes = numberOfOnGoingSnapshotsToDeletes.remove(repository); + assert removedActiveDeletes != null && removedActiveDeletes == 1 : removedActiveDeletes; + final Set r = onGoingSnapshotsDeletions.remove(repository); + assert r == null || r.isEmpty() : repository + " has non empty " + r; + final Queue q = queues.remove(repository); + assert q == null || q.isEmpty() : repository + " has non empty " + q; + } + } + } + + /** + * A {@link Runnable} used to process the deletion of a snapshot marked as to delete. + */ + private class SnapshotToDeleteRunnable extends AbstractRunnable { + + private final SnapshotId snapshotId; + private final String repository; + + private boolean retryLater; + + SnapshotToDeleteRunnable(String repository, SnapshotId snapshotId) { + super(); + this.repository = Objects.requireNonNull(repository); + this.snapshotId = Objects.requireNonNull(snapshotId); + this.retryLater = false; + } + + @Override + protected void doRun() throws Exception { + try { + logger.debug("[{}] triggering deletion of snapshot [{}]", repository, snapshotId); + final PlainActionFuture future = PlainActionFuture.newFuture(); + deleteSnapshots(new DeleteSnapshotRequest(repository, snapshotId.getName()), future); + future.actionGet(); + } finally { + removeSnapshotsToDelete(repository, snapshotId); + } + } + + @Override + public void onFailure(Exception e) { + if (e instanceof SnapshotMissingException) { + logger.debug("[{}] snapshot to delete [{}] is already deleted", repository, snapshotId); + + } else if (e instanceof RepositoryMissingException) { + logger.warn( + () -> new ParameterizedMessage("[{}] failed to delete snapshot [{}] (repository removed)", repository, snapshotId), + e + ); + + } else if (e instanceof ConcurrentSnapshotExecutionException) { + logger.debug( + "[{}] failed to delete snapshot [{}] (concurrent operation running, retrying in {})", + repository, + snapshotId, + failedSnapshotsToDeleteRetryInterval.getStringRep() + ); + retryLater = true; + + } else { + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot [{}]", repository, snapshotId), e); + } + } + + @Override + public void onAfter() { + if (retryLater == false) { + processSnapshotsToDelete(repository); + } else { + threadPool.scheduleUnlessShuttingDown( + failedSnapshotsToDeleteRetryInterval, + ThreadPool.Names.GENERIC, + () -> startDeletionOfSnapshotsToDelete(clusterService.state()) + ); + } + } + } + private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( ImmutableOpenMap snapshotShards, RoutingTable routingTable, @@ -1667,6 +1892,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (deletionToRun == null) { + startDeletionOfSnapshotsToDelete(newState); runNextQueuedOperation(repositoryData, repository, false); } else { deleteSnapshotsFromRepository(deletionToRun, repositoryData, newState.nodes().getMinNodeVersion()); @@ -1966,6 +2192,7 @@ public void onNoLongerMaster(String source) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { failSnapshotCompletionListeners(snapshot, failure); + startDeletionOfSnapshotsToDelete(newState); if (repositoryData != null) { runNextQueuedOperation(repositoryData, snapshot.getRepository(), true); } @@ -2554,6 +2781,7 @@ public final void clusterStateProcessed(String source, ClusterState oldState, Cl // TODO: be more efficient here, we could collect newly ready shard clones as we compute them and then directly start them // instead of looping over all possible clones to execute startExecutableClones(newState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY), null); + startDeletionOfSnapshotsToDelete(newState); } /** @@ -2714,7 +2942,8 @@ private boolean alreadyReassigned(String indexName, int shardId, Map> nodePlugins() { - return List.of(LocalStateSearchableSnapshots.class); + return CollectionUtils.appendToCopy(super.nodePlugins(), LocalStateSearchableSnapshots.class); } @Override diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java index 129a2c02ada8f..09d4f4cd5079a 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java @@ -8,23 +8,37 @@ package org.elasticsearch.xpack.searchablesnapshots; import org.apache.lucene.search.TotalHits; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException; +import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotRestoreException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING; @@ -37,6 +51,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; @@ -159,6 +174,7 @@ public void testMountIndexWithDeletionOfSnapshotFailsIfNotSingleIndexSnapshot() containsString("snapshot contains [" + nbIndices + "] indices instead of 1.") ) ); + awaitNoMoreSnapshotsDeletions(); } public void testMountIndexWithDifferentDeletionOfSnapshot() throws Exception { @@ -215,6 +231,7 @@ public void testMountIndexWithDifferentDeletionOfSnapshot() throws Exception { assertAcked(client().admin().indices().prepareDelete(mountedAgain)); assertAcked(client().admin().indices().prepareDelete(mounted)); + awaitNoMoreSnapshotsDeletions(); } public void testDeletionOfSnapshotSettingCannotBeUpdated() throws Exception { @@ -261,6 +278,7 @@ public void testDeletionOfSnapshotSettingCannotBeUpdated() throws Exception { ); assertAcked(client().admin().indices().prepareDelete(mounted)); + awaitNoMoreSnapshotsDeletions(); } public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { @@ -280,9 +298,7 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { mountSnapshot(repository, snapshotOfIndex, indexName, mountedIndex, indexSettings, randomFrom(Storage.values())); assertThat( getDeleteSnapshotIndexSetting(mountedIndex), - indexSettings.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) - ? equalTo("false") - : nullValue() + indexSettings.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) ? equalTo("false") : nullValue() ); final String snapshotOfMountedIndex = "snapshot-of-mounted-index"; @@ -295,9 +311,7 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { mountSnapshot(repository, snapshotOfIndex, indexName, mountedIndexAgain, indexSettingsAgain, randomFrom(Storage.values())); assertThat( getDeleteSnapshotIndexSetting(mountedIndexAgain), - indexSettingsAgain.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) - ? equalTo("true") - : nullValue() + indexSettingsAgain.hasValue(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION) ? equalTo("true") : nullValue() ); logger.info("--> restoring snapshot of searchable snapshot index [{}] should be conflicting", mountedIndex); @@ -321,6 +335,7 @@ public void testRestoreSearchableSnapshotIndexConflicts() throws Exception { ) ); assertAcked(client().admin().indices().prepareDelete("mounted-*")); + awaitNoMoreSnapshotsDeletions(); } public void testRestoreSearchableSnapshotIndexWithDifferentSettingsConflicts() throws Exception { @@ -402,6 +417,7 @@ public void testRestoreSearchableSnapshotIndexWithDifferentSettingsConflicts() t assertAcked(client().admin().indices().prepareDelete("mounted-*")); assertAcked(client().admin().indices().prepareDelete("restored-with-same-setting-*")); + awaitNoMoreSnapshotsDeletions(); } public void testSnapshotMarkedAsToDeleteCannotBeMounted() throws Exception { @@ -422,15 +438,20 @@ public void testSnapshotMarkedAsToDeleteCannotBeMounted() throws Exception { assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); assertAcked(client().admin().indices().prepareDelete(mounted)); - final SnapshotException exception = expectThrows(SnapshotException.class, - () -> mountSnapshot(repository, snapshot, index, deleteSnapshotIndexSettings(true))); + final SnapshotException exception = expectThrows( + SnapshotException.class, + () -> mountSnapshot(repository, snapshot, index, deleteSnapshotIndexSettings(true)) + ); assertThat(exception, anyOf(instanceOf(SnapshotMissingException.class), instanceOf(ConcurrentSnapshotExecutionException.class))); - assertThat(exception.getMessage(), + assertThat( + exception.getMessage(), anyOf( containsString("cannot restore a snapshot while a snapshot deletion is in-progress"), - containsString("cannot restore a snapshot already marked as deleted") + containsString("cannot restore a snapshot already marked as deleted"), + containsString(snapshot + "] is missing") ) ); + awaitNoMoreSnapshotsDeletions(); } public void testSnapshotMarkedAsToDeleteCannotBeCloned() throws Exception { @@ -451,15 +472,20 @@ public void testSnapshotMarkedAsToDeleteCannotBeCloned() throws Exception { assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); assertAcked(client().admin().indices().prepareDelete(mounted)); - final SnapshotException exception = expectThrows(SnapshotException.class, - () -> client().admin().cluster().prepareCloneSnapshot(repository, snapshot, "clone-" + snapshot).setIndices(index).get()); + final SnapshotException exception = expectThrows( + SnapshotException.class, + () -> client().admin().cluster().prepareCloneSnapshot(repository, snapshot, "clone-" + snapshot).setIndices(index).get() + ); assertThat(exception, anyOf(instanceOf(SnapshotMissingException.class), instanceOf(ConcurrentSnapshotExecutionException.class))); - assertThat(exception.getMessage(), + assertThat( + exception.getMessage(), anyOf( containsString("cannot clone a snapshot that is marked as deleted"), - containsString("cannot clone from snapshot that is being deleted") + containsString("cannot clone from snapshot that is being deleted"), + containsString(snapshot + "] is missing") ) ); + awaitNoMoreSnapshotsDeletions(); } public void testSnapshotMarkedAsToDeleteCannotBeRestored() throws Exception { @@ -480,20 +506,228 @@ public void testSnapshotMarkedAsToDeleteCannotBeRestored() throws Exception { assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); assertAcked(client().admin().indices().prepareDelete(mounted)); - final SnapshotException exception = expectThrows(SnapshotException.class, - () -> client().admin() - .cluster() - .prepareRestoreSnapshot(repository, snapshot) - .setIndices(index) - .setWaitForCompletion(true) - .get()); + final SnapshotException exception = expectThrows( + SnapshotException.class, + () -> client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setIndices(index).setWaitForCompletion(true).get() + ); assertThat(exception, anyOf(instanceOf(SnapshotMissingException.class), instanceOf(ConcurrentSnapshotExecutionException.class))); - assertThat(exception.getMessage(), + assertThat( + exception.getMessage(), anyOf( containsString("cannot restore a snapshot while a snapshot deletion is in-progress"), - containsString("cannot restore a snapshot already marked as deleted") + containsString("cannot restore a snapshot already marked as deleted"), + containsString(snapshot + "] is missing") ) ); + awaitNoMoreSnapshotsDeletions(); + } + + public void testSearchableSnapshotIsDeletedAfterIndexIsDeleted() throws Exception { + final String suffix = getTestName().toLowerCase(Locale.ROOT); + final String repository = "repository-" + suffix; + createRepository(repository, FsRepository.TYPE, randomRepositorySettings()); + + final String index = "index-" + suffix; + createAndPopulateIndex(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + + final String snapshot = "snapshot-" + suffix; + createSnapshot(repository, snapshot, List.of(index)); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = "mounted-" + suffix; + mountSnapshot(repository, snapshot, index, mounted, deleteSnapshotIndexSettings(true), randomFrom(Storage.values())); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + + assertAcked(client().admin().indices().prepareDelete(mounted)); + awaitNoMoreSnapshotsDeletions(); + + expectThrows(SnapshotMissingException.class, () -> clusterAdmin().prepareDeleteSnapshot(repository, snapshot).get()); + } + + public void testSearchableSnapshotsAreDeletedAfterMountedIndicesAreDeleted() throws Exception { + final String repository = "repository-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(logger, repository, "mock"); + + final List snapshots = new ArrayList<>(); + final int nbSnapshots = randomIntBetween(2, 10); + for (int s = 0; s < nbSnapshots; s++) { + createAndPopulateIndex("index", Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true)); + final String snapshot = "snapshot-" + s; + createSnapshot(repository, snapshot, List.of("index")); + assertAcked(client().admin().indices().prepareDelete("index")); + snapshots.add(snapshot); + } + + final int nbIndices = randomIntBetween(2, 10); + final Map mounts = new HashMap<>(nbIndices); + for (int i = 0; i < nbIndices; i++) { + final String mounted = "mounted-" + i; + final String snapshot = randomFrom(snapshots); + logger.info("--> mounting snapshot [{}] as index [{}]", snapshot, mounted); + mountSnapshot(repository, snapshot, "index", mounted, deleteSnapshotIndexSettings(true), randomFrom(Storage.values())); + mounts.put(mounted, snapshot); + } + + awaitNoMoreRunningOperations(); + + final List> futures = new ArrayList<>(); + blockAllDataNodes(repository); + + for (int i = 0; i < nbSnapshots; i++) { + final ActionFuture future; + switch (randomInt(2)) { + case 0: + future = client().admin() + .cluster() + .prepareRestoreSnapshot(repository, "snapshot-" + i) + .setIndices("index") + .setRenamePattern("(.+)") + .setRenameReplacement("$1-restored-" + i) + .setWaitForCompletion(true) + .execute(); + break; + case 1: + future = client().admin() + .cluster() + .prepareCloneSnapshot(repository, "snapshot-" + i, "clone-" + i) + .setIndices("index") + .execute(); + break; + case 2: + future = client().admin().cluster().prepareDeleteSnapshot(repository, "snapshot-" + i).execute(); + break; + default: + throw new AssertionError(); + } + futures.add(future); + } + + awaitClusterState( + state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() > 0 + || state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).isEmpty() == false + || state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() + ); + + final List remainingIndicesToDelete = new ArrayList<>(mounts.keySet()); + while (remainingIndicesToDelete.isEmpty() == false) { + final List toDelete = randomValueOtherThanMany(List::isEmpty, () -> randomSubsetOf(remainingIndicesToDelete)); + logger.info("--> deleting mounted indices [{}]", toDelete); + futures.add(client().admin().indices().prepareDelete(toDelete.toArray(String[]::new)).execute()); + toDelete.forEach(remainingIndicesToDelete::remove); + } + + unblockAllDataNodes(repository); + + assertBusy(() -> { + for (ActionFuture future : futures) { + assertTrue(future.isDone()); + try { + Object response = future.get(); + if (response instanceof AcknowledgedResponse) { + assertAcked((AcknowledgedResponse) response); + + } else if (response instanceof RestoreSnapshotResponse) { + final RestoreSnapshotResponse restoreResponse = ((RestoreSnapshotResponse) response); + assertThat(restoreResponse.getRestoreInfo().successfulShards(), greaterThanOrEqualTo(1)); + assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0)); + + } else { + throw new AssertionError("Unknown response type: " + response); + } + } catch (ExecutionException e) { + Throwable cause = ExceptionsHelper.unwrap(e, SnapshotException.class); + if (cause == null) { + cause = ExceptionsHelper.unwrapCause(e); + } + assertThat( + cause, + anyOf(instanceOf(ConcurrentSnapshotExecutionException.class), instanceOf(SnapshotMissingException.class)) + ); + assertThat( + cause.getMessage(), + anyOf( + containsString("cannot restore a snapshot already marked as deleted"), + containsString("cannot clone a snapshot that is marked as deleted"), + containsString("cannot clone from snapshot that is being deleted"), + allOf(containsString('[' + repository + ":snapshot-"), containsString(" is missing")) + ) + ); + } + } + }); + + awaitNoMoreSnapshotsDeletions(); + + assertBusy(() -> { + for (Map.Entry mount : mounts.entrySet()) { + expectThrows( + IndexNotFoundException.class, + "Expected index to be deleted: " + mount.getKey(), + () -> client().admin().indices().prepareGetIndex().setIndices(mount.getKey()).get() + ); + expectThrows( + SnapshotMissingException.class, + "Expected snapshot to be deleted: " + mount.getValue(), + () -> client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(mount.getValue()).get() + ); + } + }); + } + + public void testSearchableSnapshotIsDeletedWithOnGoingRestore() throws Exception { + final String repository = "repository-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(logger, repository, "mock"); + + final String index = "index"; + assertAcked(prepareCreate(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); + ensureGreen(index); + populateIndex(index, 5_000); + refresh(index); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + final NumShards numShards = getNumShards(index); + + final String snapshot = "snapshot"; + createSnapshot(repository, snapshot, List.of(index)); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = "mounted-" + index; + mountSnapshot(repository, snapshot, index, mounted, deleteSnapshotIndexSettings(true), randomFrom(Storage.values())); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + + blockAllDataNodes(repository); + + final ActionFuture restoreFuture = client().admin() + .cluster() + .prepareRestoreSnapshot(repository, snapshot) + .setIndices(index) + .setRenamePattern("(.+)") + .setRenameReplacement("restored-$1") + .setWaitForCompletion(true) + .execute(); + + awaitClusterState(state -> state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).isEmpty() == false); + + final ActionFuture deleteFuture = client().admin().indices().prepareDelete(mounted).execute(); + + awaitClusterState( + state -> state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repository(repository) + .hasSnapshotsToDelete() + ); + + unblockAllDataNodes(repository); + awaitNoMoreSnapshotsDeletions(); + + final RestoreInfo restoreInfoResponse = restoreFuture.actionGet().getRestoreInfo(); + assertThat(restoreInfoResponse.successfulShards(), equalTo(numShards.numPrimaries)); + assertThat(restoreInfoResponse.failedShards(), equalTo(0)); + assertAcked(deleteFuture.get()); + + expectThrows(SnapshotMissingException.class, () -> clusterAdmin().prepareDeleteSnapshot(repository, snapshot).get()); } private static Settings deleteSnapshotIndexSettings(boolean value) { @@ -515,4 +749,21 @@ private static String getDeleteSnapshotIndexSetting(String indexName) { final GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings(indexName).get(); return getSettingsResponse.getSetting(indexName, SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION); } + + protected void awaitNoMoreSnapshotsDeletions() throws Exception { + final String master = internalCluster().getMasterName(); + awaitClusterState(logger, master, state -> { + SnapshotDeletionsInProgress deletions = state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY); + if (deletions.hasDeletionsInProgress()) { + return false; + } + RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + for (RepositoryMetadata repository : repositories.repositories()) { + if (repository.hasSnapshotsToDelete()) { + return false; + } + } + return true; + }); + } } From 955a5fc8cbe8bfb42892e0bf8d944185f9690264 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 21 Jul 2021 10:17:14 +0200 Subject: [PATCH 05/11] add settings --- .../common/settings/ClusterSettings.java | 2 + .../snapshots/SnapshotsService.java | 167 +++++++++++------- 2 files changed, 110 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index d50f8e92152e1..966f6a4c8dce6 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -478,6 +478,8 @@ public void apply(Settings value, Settings current, Settings previous) { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, + SnapshotsService.MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING, + SnapshotsService.SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING, RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING, FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5000ad34e2206..dcdea3b8c2273 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -222,6 +222,15 @@ public SnapshotsService( maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); + maxConcurrentSnapshotsToDeletes = MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING, + i -> maxConcurrentSnapshotsToDeletes = i + ); + snapshotsToDeleteRetryInterval = SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING, t -> snapshotsToDeleteRetryInterval = t); } this.systemIndexDescriptorMap = systemIndexDescriptorMap; } @@ -1234,39 +1243,23 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } /** - * Asserts that a snapshot to delete cannot be included both in {@link RepositoryMetadata} and in {@link SnapshotDeletionsInProgress}. - * - * @param state the current {@link ClusterState} - * @return true if all checks succeed + * Maximum number of on-going snapshots to delete per repository */ - private static boolean assertSnapshotsToDeleteConsistency(ClusterState state) { - final Map> snapshotsToDelete = new HashMap<>(); - for (RepositoryMetadata repository : state.metadata() - .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) - .repositories()) { - snapshotsToDelete.put(repository.name(), Set.copyOf(repository.snapshotsToDelete())); - } - final Map> onGoingSnapshotDeletions = new HashMap<>(); - for (SnapshotDeletionsInProgress.Entry deletionInProgress : state.custom( - SnapshotDeletionsInProgress.TYPE, - SnapshotDeletionsInProgress.EMPTY - ).getEntries()) { - onGoingSnapshotDeletions.put(deletionInProgress.repository(), Set.copyOf(deletionInProgress.getSnapshots())); - } - for (String repository : Sets.union(snapshotsToDelete.keySet(), onGoingSnapshotDeletions.keySet())) { - Set set1 = snapshotsToDelete.getOrDefault(repository, Set.of()); - Set set2 = onGoingSnapshotDeletions.getOrDefault(repository, Set.of()); - assert Sets.intersection(set1, set2).isEmpty() - : "repository [" - + repository - + "] has snapshots deletions still marked as to delete [SnapshotDeletionsInProgress={" - + onGoingSnapshotDeletions.get(repository) - + "}, RepositoryMetadata={" - + snapshotsToDelete.get(repository) - + "}]"; - } - return true; - } + public static final Setting MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING = Setting.intSetting( + "snapshot.snapshots_to_delete.max_concurrent_operations", + 5, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING = Setting.timeSetting( + "snapshot.snapshots_to_delete.retry_interval", + TimeValue.timeValueSeconds(30L), + TimeValue.timeValueSeconds(1L), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); /** * Mutex used to modify snapshots to delete related objects. @@ -1281,19 +1274,15 @@ private static boolean assertSnapshotsToDeleteConsistency(ClusterState state) { /** * Queue of snapshots whose deletion must be triggered. Each repository has a dedicated queue. */ - private final Map> queues = new HashMap<>(); + private final Map> queuesSnapshotsDeletions = new HashMap<>(); /** * Number of on-going snapshots to delete per repository */ private final Map numberOfOnGoingSnapshotsToDeletes = new HashMap<>(); - /** - * Maximum number of on-going snapshots to delete per repository - */ - private final int maxConcurrentSnapshotsToDeletes = 5; - - private final TimeValue failedSnapshotsToDeleteRetryInterval = TimeValue.timeValueSeconds(30L); + private volatile int maxConcurrentSnapshotsToDeletes; + private volatile TimeValue snapshotsToDeleteRetryInterval; /** * Finds snapshots to delete in the the cluster state repositories metadata and triggers explicit snapshot delete requests. @@ -1310,7 +1299,7 @@ private void startDeletionOfSnapshotsToDelete(final ClusterState state) { int newSnapshotsToDelete = 0; for (SnapshotId snapshotId : repository.snapshotsToDelete()) { if (onGoingSnapshotsDeletions.computeIfAbsent(repository.name(), r -> new LinkedHashSet<>()).add(snapshotId)) { - queues.computeIfAbsent(repository.name(), q -> new LinkedList<>()).add(snapshotId); + queuesSnapshotsDeletions.computeIfAbsent(repository.name(), q -> new LinkedList<>()).add(snapshotId); newSnapshotsToDelete += 1; } } @@ -1321,17 +1310,18 @@ private void startDeletionOfSnapshotsToDelete(final ClusterState state) { } } else { - for (Map.Entry> queue : queues.entrySet()) { + for (Map.Entry> queue : queuesSnapshotsDeletions.entrySet()) { final Set snapshotsToDelete = onGoingSnapshotsDeletions.get(queue.getKey()); assert snapshotsToDelete != null : queue.getKey() + " has no snapshots to delete"; - SnapshotId snapshotId = null; + SnapshotId snapshotId; while ((snapshotId = queue.getValue().poll()) != null) { final boolean removed = snapshotsToDelete.remove(snapshotId); assert removed : "snapshot to delete not found: " + snapshotId; } } } + assert assertSnapshotsToDeleteInvariants(); } } @@ -1339,22 +1329,20 @@ private void processSnapshotsToDelete(final String repository) { synchronized (snapshotToDeleteMutex) { final int activeDeletes = numberOfOnGoingSnapshotsToDeletes.getOrDefault(repository, 0); if (activeDeletes < maxConcurrentSnapshotsToDeletes) { - final Queue queue = queues.get(repository); - if (queue == null) { - assert onGoingSnapshotsDeletions.containsKey(repository) == false : repository; - assert numberOfOnGoingSnapshotsToDeletes.containsKey(repository) == false : repository; - return; - } - - final SnapshotId snapshotId = queue.poll(); - if (snapshotId != null) { + final Queue queue = queuesSnapshotsDeletions.get(repository); + if (queue != null) { assert onGoingSnapshotsDeletions.containsKey(repository) : repository; - assert onGoingSnapshotsDeletions.get(repository).contains(snapshotId) : snapshotId; - numberOfOnGoingSnapshotsToDeletes.put(repository, activeDeletes + 1); - threadPool.generic().execute(new SnapshotToDeleteRunnable(repository, snapshotId)); + final SnapshotId snapshotId = queue.poll(); + if (snapshotId != null) { + assert onGoingSnapshotsDeletions.get(repository).contains(snapshotId) : snapshotId; + + numberOfOnGoingSnapshotsToDeletes.put(repository, activeDeletes + 1); + threadPool.generic().execute(new SnapshotToDeleteRunnable(repository, snapshotId)); + } } } + assert assertSnapshotsToDeleteInvariants(); } } @@ -1374,11 +1362,14 @@ private void removeSnapshotsToDelete(final String repository, final SnapshotId s } else { final Integer removedActiveDeletes = numberOfOnGoingSnapshotsToDeletes.remove(repository); assert removedActiveDeletes != null && removedActiveDeletes == 1 : removedActiveDeletes; + final Set r = onGoingSnapshotsDeletions.remove(repository); - assert r == null || r.isEmpty() : repository + " has non empty " + r; - final Queue q = queues.remove(repository); - assert q == null || q.isEmpty() : repository + " has non empty " + q; + assert r.isEmpty() : repository + " has non empty " + r; + + final Queue q = queuesSnapshotsDeletions.remove(repository); + assert q.isEmpty() : repository + " has non empty " + q; } + assert assertSnapshotsToDeleteInvariants(); } } @@ -1427,7 +1418,7 @@ public void onFailure(Exception e) { "[{}] failed to delete snapshot [{}] (concurrent operation running, retrying in {})", repository, snapshotId, - failedSnapshotsToDeleteRetryInterval.getStringRep() + snapshotsToDeleteRetryInterval.getStringRep() ); retryLater = true; @@ -1442,7 +1433,7 @@ public void onAfter() { processSnapshotsToDelete(repository); } else { threadPool.scheduleUnlessShuttingDown( - failedSnapshotsToDeleteRetryInterval, + snapshotsToDeleteRetryInterval, ThreadPool.Names.GENERIC, () -> startDeletionOfSnapshotsToDelete(clusterService.state()) ); @@ -1450,6 +1441,64 @@ public void onAfter() { } } + private boolean assertSnapshotsToDeleteInvariants() { + assert Thread.holdsLock(snapshotToDeleteMutex); + + final Set repositories = new HashSet<>(onGoingSnapshotsDeletions.keySet()); + repositories.addAll(queuesSnapshotsDeletions.keySet()); + repositories.addAll(numberOfOnGoingSnapshotsToDeletes.keySet()); + + for (String repository : repositories) { + if (onGoingSnapshotsDeletions.containsKey(repository) + || queuesSnapshotsDeletions.containsKey(repository) + || numberOfOnGoingSnapshotsToDeletes.containsKey(repository)) { + assert onGoingSnapshotsDeletions.get(repository) != null; + assert queuesSnapshotsDeletions.get(repository) != null; + assert numberOfOnGoingSnapshotsToDeletes.get(repository) != null; + } else { + assert onGoingSnapshotsDeletions.get(repository) == null; + assert queuesSnapshotsDeletions.get(repository) == null; + assert numberOfOnGoingSnapshotsToDeletes.get(repository) == null; + } + } + return true; + } + + /** + * Asserts that a snapshot to delete cannot be included both in {@link RepositoryMetadata} and in {@link SnapshotDeletionsInProgress}. + * + * @param state the current {@link ClusterState} + * @return true if all checks succeed + */ + private static boolean assertSnapshotsToDeleteConsistency(ClusterState state) { + final Map> snapshotsToDelete = new HashMap<>(); + for (RepositoryMetadata repository : state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repositories()) { + snapshotsToDelete.put(repository.name(), Set.copyOf(repository.snapshotsToDelete())); + } + final Map> onGoingSnapshotDeletions = new HashMap<>(); + for (SnapshotDeletionsInProgress.Entry deletionInProgress : state.custom( + SnapshotDeletionsInProgress.TYPE, + SnapshotDeletionsInProgress.EMPTY + ).getEntries()) { + onGoingSnapshotDeletions.put(deletionInProgress.repository(), Set.copyOf(deletionInProgress.getSnapshots())); + } + for (String repository : Sets.union(snapshotsToDelete.keySet(), onGoingSnapshotDeletions.keySet())) { + Set set1 = snapshotsToDelete.getOrDefault(repository, Set.of()); + Set set2 = onGoingSnapshotDeletions.getOrDefault(repository, Set.of()); + assert Sets.intersection(set1, set2).isEmpty() + : "repository [" + + repository + + "] has snapshots deletions still marked as to delete [SnapshotDeletionsInProgress={" + + onGoingSnapshotDeletions.get(repository) + + "}, RepositoryMetadata={" + + snapshotsToDelete.get(repository) + + "}]"; + } + return true; + } + private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( ImmutableOpenMap snapshotShards, RoutingTable routingTable, From e1995d7f4c64aa0e9e9307eb00435371e0d6e850 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 3 Sep 2021 17:29:50 +0200 Subject: [PATCH 06/11] Remove other repository name check --- .../cluster/metadata/MetadataDeleteIndexService.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index d84c99e31f515..28b388bc1610a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -207,10 +207,6 @@ private static Map> listOfSnapshotsToDelete(final Cluste if (Objects.equals(snapshotUuid, otherSnapshotUuid) == false) { continue; // other index is backed by a different snapshot, skip } - final String otherRepositoryName = repositoryNameFromIndexSettings(currentState, otherSettings); - if (Objects.equals(repositoryName, otherRepositoryName) == false) { - continue; // other index is backed by a snapshot from a different repository, skip - } assert otherSettings.getAsBoolean(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, false) : other; canDeleteSnapshot = false; // another index is using the same snapshot, do not delete break; From e13a11a3ff4df93af60fbc2f8fdfdb1185c7abf8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 3 Sep 2021 17:30:13 +0200 Subject: [PATCH 07/11] Do not re read repository name --- .../cluster/metadata/MetadataDeleteIndexService.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index 28b388bc1610a..fd086db6be47e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -138,17 +138,10 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) final Map> snapshotsToDelete = listOfSnapshotsToDelete(currentState, indicesToDelete); if (snapshotsToDelete.isEmpty() == false) { RepositoriesMetadata repositories = currentState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); - boolean changed = false; for (Map.Entry> snapshotToDelete : snapshotsToDelete.entrySet()) { - RepositoryMetadata repository = repositories.repository(snapshotToDelete.getKey()); - if (repository != null) { - repositories = repositories.addSnapshotsToDelete(repository.name(), snapshotToDelete.getValue()); - changed = true; - } - } - if (changed) { - metadataBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); + repositories = repositories.addSnapshotsToDelete(snapshotToDelete.getKey(), snapshotToDelete.getValue()); } + metadataBuilder.putCustom(RepositoriesMetadata.TYPE, repositories); } Metadata newMetadata = metadataBuilder.build(); From 4df53d73e9cd3be9ebe98023902a6b04d3db8dc9 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 3 Sep 2021 17:33:24 +0200 Subject: [PATCH 08/11] Delete snapshots using UUIDs --- .../delete/TransportDeleteSnapshotAction.java | 2 +- .../snapshots/SnapshotsService.java | 49 +++++++++++++++---- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index 32fdb285cb808..6de81eba3acdb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -64,6 +64,6 @@ protected void masterOperation( ClusterState state, final ActionListener listener ) { - snapshotsService.deleteSnapshots(request, listener.map(v -> AcknowledgedResponse.TRUE)); + snapshotsService.deleteSnapshotsByName(request, listener.map(v -> AcknowledgedResponse.TRUE)); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5f863d5736407..bfaae3d0b28a6 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1473,7 +1473,7 @@ protected void doRun() throws Exception { try { logger.debug("[{}] triggering deletion of snapshot [{}]", repository, snapshotId); final PlainActionFuture future = PlainActionFuture.newFuture(); - deleteSnapshots(new DeleteSnapshotRequest(repository, snapshotId.getName()), future); + deleteSnapshotsByUuid(new DeleteSnapshotRequest(repository, snapshotId.getUUID()), future); future.actionGet(); } finally { removeSnapshotsToDelete(repository, snapshotId); @@ -2354,18 +2354,47 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { } /** - * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. Snapshots + * to delete are identified by their names. + * + * @param request delete snapshot request + * @param listener listener + */ + public void deleteSnapshotsByName(final DeleteSnapshotRequest request, final ActionListener listener) { + deleteSnapshots(SnapshotId::getName, request, listener); + } + + /** + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. Snapshots + * to delete are identified by their UUIDs. * * @param request delete snapshot request * @param listener listener */ - public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) { + private void deleteSnapshotsByUuid(final DeleteSnapshotRequest request, final ActionListener listener) { + deleteSnapshots(SnapshotId::getUUID, request, listener); + } + + /** + * Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them. + * Snapshots to delete are identified by converting their {@link SnapshotId} to a {@link String} using the mapping function + * {@code mapping}; the resulting string is then compared to the snapshots names/uuids/patterns to match against. + * + * @param mapping the mapping function used to match the {@link SnapshotId} against the given snapshotNamesOrUuids + * @param request the {@link DeleteSnapshotRequest} + * @param listener listener + */ + private void deleteSnapshots( + final Function mapping, + final DeleteSnapshotRequest request, + final ActionListener listener + ) { final String repositoryName = request.repository(); - final String[] snapshotNames = request.snapshots(); + final String[] snapshotNamesOrUuids = request.snapshots(); logger.info( () -> new ParameterizedMessage( "deleting snapshots [{}] from repository [{}]", - Strings.arrayToCommaDelimitedString(snapshotNames), + Strings.arrayToCommaDelimitedString(snapshotNamesOrUuids), repositoryName ) ); @@ -2394,7 +2423,7 @@ public ClusterState execute(ClusterState currentState) { final SnapshotsInProgress snapshotsInProgress = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { final SnapshotId snapshotId = entry.snapshot().getSnapshotId(); - if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNames, snapshotId.getName())) { + if (entry.repository().equals(repositoryName) && Regex.simpleMatch(snapshotNamesOrUuids, mapping.apply(snapshotId))) { snapshotIds.add(snapshotId); } } @@ -2402,8 +2431,8 @@ public ClusterState execute(ClusterState currentState) { // find snapshots to delete in repository data final Map snapshotsIdsInRepository = repositoryData.getSnapshotIds() .stream() - .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); - for (String snapshotOrPattern : snapshotNames) { + .collect(Collectors.toMap(mapping, Function.identity())); + for (String snapshotOrPattern : snapshotNamesOrUuids) { if (Regex.isSimpleMatchPattern(snapshotOrPattern)) { for (Map.Entry entry : snapshotsIdsInRepository.entrySet()) { if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { @@ -2413,7 +2442,7 @@ public ClusterState execute(ClusterState currentState) { } else { final SnapshotId foundId = snapshotsIdsInRepository.get(snapshotOrPattern); if (foundId == null) { - if (snapshotIds.stream().noneMatch(snapshotId -> snapshotId.getName().equals(snapshotOrPattern))) { + if (snapshotIds.stream().map(mapping).noneMatch(snapshot -> snapshot.equals(snapshotOrPattern))) { throw new SnapshotMissingException(repositoryName, snapshotOrPattern); } } else { @@ -2570,7 +2599,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } } - }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure); + }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNamesOrUuids), listener::onFailure); } /** From 3e8dc653f9d939ba35857dc704f66a036ee5c990 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 6 Sep 2021 16:28:05 +0200 Subject: [PATCH 09/11] rework snapshot deletion --- .../metadata/MetadataDeleteIndexService.java | 69 +-- .../cluster/metadata/RepositoryMetadata.java | 4 + .../common/settings/ClusterSettings.java | 1 - .../repositories/RepositoriesService.java | 2 +- .../snapshots/SnapshotsService.java | 437 ++++++++---------- ...archableSnapshotsRepositoryIntegTests.java | 136 +++++- 6 files changed, 367 insertions(+), 282 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java index fd086db6be47e..2ff35b7a533f6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDeleteIndexService.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; -import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.SearchableSnapshotsSettings; import org.elasticsearch.snapshots.SnapshotId; @@ -38,12 +37,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import static java.util.Collections.emptyList; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; @@ -74,13 +71,15 @@ public void deleteIndices(final DeleteIndexClusterStateUpdateRequest request, fi throw new IllegalArgumentException("Index name is required"); } - clusterService.submitStateUpdateTask("delete-index " + Arrays.toString(request.indices()), + clusterService.submitStateUpdateTask( + "delete-index " + Arrays.toString(request.indices()), new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { @Override public ClusterState execute(final ClusterState currentState) { return deleteIndices(currentState, Sets.newHashSet(request.indices())); } - }); + } + ); } /** @@ -95,8 +94,13 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) IndexAbstraction.DataStream parent = meta.getIndicesLookup().get(im.getIndex().getName()).getParentDataStream(); if (parent != null) { if (parent.getWriteIndex().equals(im)) { - throw new IllegalArgumentException("index [" + index.getName() + "] is the write index for data stream [" + - parent.getName() + "] and cannot be deleted"); + throw new IllegalArgumentException( + "index [" + + index.getName() + + "] is the write index for data stream [" + + parent.getName() + + "] and cannot be deleted" + ); } else { backingIndices.put(index, parent.getDataStream()); } @@ -107,8 +111,11 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) // Check if index deletion conflicts with any running snapshots Set snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete); if (snapshottingIndices.isEmpty() == false) { - throw new SnapshotInProgressException("Cannot delete indices that are being snapshotted: " + snapshottingIndices + - ". Try again after snapshot finishes or cancel the currently running snapshot."); + throw new SnapshotInProgressException( + "Cannot delete indices that are being snapshotted: " + + snapshottingIndices + + ". Try again after snapshot finishes or cancel the currently running snapshot." + ); } RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); @@ -131,8 +138,12 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) // add tombstones to the cluster state for each deleted index final IndexGraveyard currentGraveyard = graveyardBuilder.addTombstones(indices).build(settings); metadataBuilder.indexGraveyard(currentGraveyard); // the new graveyard set on the metadata - logger.trace("{} tombstones purged from the cluster state. Previous tombstone size: {}. Current tombstone size: {}.", - graveyardBuilder.getNumPurged(), previousGraveyardSize, currentGraveyard.getTombstones().size()); + logger.trace( + "{} tombstones purged from the cluster state. Previous tombstone size: {}. Current tombstone size: {}.", + graveyardBuilder.getNumPurged(), + previousGraveyardSize, + currentGraveyard.getTombstones().size() + ); // add snapshot(s) marked as to delete to the cluster state final Map> snapshotsToDelete = listOfSnapshotsToDelete(currentState, indicesToDelete); @@ -158,13 +169,14 @@ public ClusterState deleteIndices(ClusterState currentState, Set indices) } return allocationService.reroute( - ClusterState.builder(currentState) - .routingTable(routingTableBuilder.build()) - .metadata(newMetadata) - .blocks(blocks) - .customs(customs) - .build(), - "deleted indices [" + indices + "]"); + ClusterState.builder(currentState) + .routingTable(routingTableBuilder.build()) + .metadata(newMetadata) + .blocks(blocks) + .customs(customs) + .build(), + "deleted indices [" + indices + "]" + ); } private static Map> listOfSnapshotsToDelete(final ClusterState currentState, final Set indicesToDelete) { @@ -184,7 +196,9 @@ private static Map> listOfSnapshotsToDelete(final Cluste // TODO change this to an assertion once it becomes impossible to delete a snapshot that is mounted as an index if (currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY) - .getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(new SnapshotId(snapshotName, snapshotUuid)))) { + .getEntries() + .stream() + .anyMatch(entry -> entry.getSnapshots().contains(new SnapshotId(snapshotName, snapshotUuid)))) { continue; // this snapshot is part of an existing snapshot deletion in progress, nothing to do } @@ -214,15 +228,14 @@ private static Map> listOfSnapshotsToDelete(final Cluste private static String repositoryNameFromIndexSettings(ClusterState currentState, Settings indexSettings) { final String repositoryUuid = indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY); - if (Strings.hasLength(repositoryUuid) == false) { - return indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); + if (Strings.hasLength(repositoryUuid)) { + final RepositoriesMetadata repositories = currentState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + for (RepositoryMetadata repository : repositories.repositories()) { + if (repositoryUuid.equals(repository.uuid())) { + return repository.name(); + } + } } - final RepositoriesMetadata repoMetadata = currentState.metadata().custom(RepositoriesMetadata.TYPE); - final List repositories = repoMetadata == null ? emptyList() : repoMetadata.repositories(); - return repositories.stream() - .filter(r -> repositoryUuid.equals(r.uuid())) - .map(RepositoryMetadata::name) - .findFirst() - .orElseThrow(() -> new RepositoryMissingException(repositoryUuid)); + return indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java index 44d779017d544..c0aee0894c555 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java @@ -246,6 +246,10 @@ public RepositoryMetadata withSettings(Settings settings) { return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, snapshotsToDelete); } + public RepositoryMetadata withSnapshotsToDelete(List snapshotsToDelete) { + return new RepositoryMetadata(name, uuid, type, settings, generation, pendingGeneration, snapshotsToDelete); + } + public RepositoryMetadata addSnapshotsToDelete(Collection snapshotsToDelete) { final List snapshots = new ArrayList<>(this.snapshotsToDelete); for (SnapshotId snapshotToDelete : snapshotsToDelete) { diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 6bc972f5d2b8c..494ab967ce09b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -478,7 +478,6 @@ public void apply(Settings value, Settings current, Settings previous) { HandshakingTransportAddressConnector.PROBE_CONNECT_TIMEOUT_SETTING, HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING, SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, - SnapshotsService.MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING, SnapshotsService.SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING, RestoreService.REFRESH_REPO_UUID_ON_RESTORE_SETTING, FsHealthService.ENABLED_SETTING, diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index da874835513f0..6a64319b76fcd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -217,7 +217,7 @@ public ClusterState execute(ClusterState currentState) { updatedMetadata = repositoryMetadata.withSettings(newRepositoryMetadata.settings()); } else { ensureRepositoryNotInUse(currentState, request.name()); - updatedMetadata = newRepositoryMetadata; + updatedMetadata = newRepositoryMetadata.withSnapshotsToDelete(repositoryMetadata.snapshotsToDelete()); } found = true; repositoriesMetadata.add(updatedMetadata); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index bfaae3d0b28a6..18ab809c8f83a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -69,7 +68,8 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -101,18 +101,17 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -123,6 +122,7 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; /** * Service responsible for creating snapshots. This service runs all the steps executed on the master node during snapshot creation and @@ -224,12 +224,6 @@ public SnapshotsService( maxConcurrentOperations = MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); - maxConcurrentSnapshotsToDeletes = MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING.get(settings); - clusterService.getClusterSettings() - .addSettingsUpdateConsumer( - MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING, - i -> maxConcurrentSnapshotsToDeletes = i - ); snapshotsToDeleteRetryInterval = SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING, t -> snapshotsToDeleteRetryInterval = t); @@ -607,6 +601,35 @@ private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryD } } + private static Set listOfCloneSources(final ClusterState state) { + return state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) + .entries() + .stream() + .filter(SnapshotsInProgress.Entry::isClone) + .map(SnapshotsInProgress.Entry::source) + .collect(Collectors.toSet()); + } + + private static Set listOfRestoreSources(final ClusterState state) { + final Set snapshotIds = new HashSet<>(); + for (RestoreInProgress.Entry restore : state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) { + snapshotIds.add(restore.snapshot().getSnapshotId()); + } + return Set.copyOf(snapshotIds); + } + + private static Set listOfDeletionsSources(final ClusterState state) { + final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress == null) { + return Set.of(); + } + final Set snapshotIds = new HashSet<>(); + for (SnapshotDeletionsInProgress.Entry deletion : deletionsInProgress.getEntries()) { + snapshotIds.addAll(deletion.getSnapshots()); + } + return Set.copyOf(snapshotIds); + } + /** * Determine the number of shards in each index of a clone operation and update the cluster state accordingly. * @@ -1038,7 +1061,6 @@ public void applyClusterState(ClusterChangedEvent event) { } assert assertConsistentWithClusterState(event.state()); assert assertNoDanglingSnapshots(event.state()); - assert assertSnapshotsToDeleteConsistency(event.state()); } private boolean assertConsistentWithClusterState(ClusterState state) { @@ -1320,17 +1342,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS ); } - /** - * Maximum number of on-going snapshots to delete per repository - */ - public static final Setting MAX_CONCURRENT_SNAPSHOT_TO_DELETE_PER_REPOSITORY_SETTING = Setting.intSetting( - "snapshot.snapshots_to_delete.max_concurrent_operations", - 5, - 1, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - public static final Setting SNAPSHOT_TO_DELETE_RETRY_INTERVAL_SETTING = Setting.timeSetting( "snapshot.snapshots_to_delete.retry_interval", TimeValue.timeValueSeconds(30L), @@ -1339,242 +1350,162 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS Setting.Property.Dynamic ); - /** - * Mutex used to modify snapshots to delete related objects. - */ - private final Object snapshotToDeleteMutex = new Object(); - - /** - * Set of snapshots to delete whose deletion will be triggered, per repository - */ - private final Map> onGoingSnapshotsDeletions = new HashMap<>(); - - /** - * Queue of snapshots whose deletion must be triggered. Each repository has a dedicated queue. - */ - private final Map> queuesSnapshotsDeletions = new HashMap<>(); + private volatile TimeValue snapshotsToDeleteRetryInterval; /** - * Number of on-going snapshots to delete per repository + * Set of snapshots to delete whose deletion is already triggered */ - private final Map numberOfOnGoingSnapshotsToDeletes = new HashMap<>(); - - private volatile int maxConcurrentSnapshotsToDeletes; - private volatile TimeValue snapshotsToDeleteRetryInterval; + private final Set onGoingSnapshotsDeletions = ConcurrentCollections.newConcurrentSet(); /** - * Finds snapshots to delete in the the cluster state repositories metadata and triggers explicit snapshot delete requests. + * Find snapshots to delete in the the cluster state repositories metadata and triggers explicit snapshot delete requests. This method + * attempts to detect conflicting situations where triggering the snapshot deletion would likely fail due to a concurrent snapshot + * operation. In such cases the snapshot deletion is not triggered as it should be triggered by subsequent cluster state updates on the + * conflicting situation is resolved. * * @param state the current {@link ClusterState} */ private void startDeletionOfSnapshotsToDelete(final ClusterState state) { - final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); - final boolean isMaster = state.nodes().isLocalNodeElectedMaster(); - - synchronized (snapshotToDeleteMutex) { - if (isMaster) { - for (RepositoryMetadata repository : repositories.repositories()) { - int newSnapshotsToDelete = 0; - for (SnapshotId snapshotId : repository.snapshotsToDelete()) { - if (onGoingSnapshotsDeletions.computeIfAbsent(repository.name(), r -> new LinkedHashSet<>()).add(snapshotId)) { - queuesSnapshotsDeletions.computeIfAbsent(repository.name(), q -> new LinkedList<>()).add(snapshotId); - newSnapshotsToDelete += 1; - } - } - - for (int i = 0; i < Math.min(newSnapshotsToDelete, maxConcurrentSnapshotsToDeletes); i++) { - processSnapshotsToDelete(repository.name()); - } - } + startDeletionOfSnapshotsToDelete(state, TimeValue.ZERO); + } - } else { - for (Map.Entry> queue : queuesSnapshotsDeletions.entrySet()) { - final Set snapshotsToDelete = onGoingSnapshotsDeletions.get(queue.getKey()); - assert snapshotsToDelete != null : queue.getKey() + " has no snapshots to delete"; - - SnapshotId snapshotId; - while ((snapshotId = queue.getValue().poll()) != null) { - final boolean removed = snapshotsToDelete.remove(snapshotId); - assert removed : "snapshot to delete not found: " + snapshotId; + private void startDeletionOfSnapshotsToDelete(final ClusterState state, final TimeValue delay) { + if (state.nodes().isLocalNodeElectedMaster() == false) { + return; // not the elected master node + } + final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + if (repositories.repositories().isEmpty()) { + return; // no repositories, nothing to do + } + if (state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()) { + return; // a repository clean up is in progress, wait for cleanup to finish + } + + final Set currentDeletions = listOfDeletionsSources(state); + final Set currentRestores = listOfRestoreSources(state); + final Set currentClones = listOfCloneSources(state); + + for (RepositoryMetadata repository : repositories.repositories()) { + if (repository.hasSnapshotsToDelete() && repository.settings().getAsBoolean(READONLY_SETTING_KEY, false) == false) { + final Set snapshotIdsToDelete = new HashSet<>(); + for (SnapshotId snapshotId : repository.snapshotsToDelete()) { + if (currentRestores.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is being restored, waiting for restore to complete", snapshotId); + } else if (currentClones.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is being cloned, waiting for operation to complete", snapshotId); + } else if (currentDeletions.contains(snapshotId)) { + logger.trace("snapshot to delete [{}] is already queued", snapshotId); + } else if (onGoingSnapshotsDeletions.add(snapshotId)) { + logger.trace("found snapshot to delete [{}]", snapshotId); + snapshotIdsToDelete.add(snapshotId); } } - } - assert assertSnapshotsToDeleteInvariants(); - } - } - - private void processSnapshotsToDelete(final String repository) { - synchronized (snapshotToDeleteMutex) { - final int activeDeletes = numberOfOnGoingSnapshotsToDeletes.getOrDefault(repository, 0); - if (activeDeletes < maxConcurrentSnapshotsToDeletes) { - final Queue queue = queuesSnapshotsDeletions.get(repository); - if (queue != null) { - assert onGoingSnapshotsDeletions.containsKey(repository) : repository; - - final SnapshotId snapshotId = queue.poll(); - if (snapshotId != null) { - assert onGoingSnapshotsDeletions.get(repository).contains(snapshotId) : snapshotId; - - numberOfOnGoingSnapshotsToDeletes.put(repository, activeDeletes + 1); - threadPool.generic().execute(new SnapshotToDeleteRunnable(repository, snapshotId)); + if (snapshotIdsToDelete.isEmpty() == false) { + final SnapshotsToDeleteRunnable runnable = new SnapshotsToDeleteRunnable(repository.name(), snapshotIdsToDelete); + if (delay.getMillis() > 0) { + threadPool.scheduleUnlessShuttingDown(delay, ThreadPool.Names.GENERIC, runnable); + } else { + threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); } } } - assert assertSnapshotsToDeleteInvariants(); - } - } - - private void removeSnapshotsToDelete(final String repository, final SnapshotId snapshotId) { - synchronized (snapshotToDeleteMutex) { - final Set snapshotsToDelete = onGoingSnapshotsDeletions.get(repository); - assert snapshotsToDelete != null : repository + " has no snapshots to delete"; - - final boolean removed = snapshotsToDelete.remove(snapshotId); - assert removed : "snapshot to delete not found: " + snapshotId; - - final int activeDeletes = numberOfOnGoingSnapshotsToDeletes.getOrDefault(repository, -1); - assert activeDeletes > 0 : repository + " has no active deletes: " + activeDeletes; - - if (activeDeletes > 1) { - numberOfOnGoingSnapshotsToDeletes.put(repository, activeDeletes - 1); - } else { - final Integer removedActiveDeletes = numberOfOnGoingSnapshotsToDeletes.remove(repository); - assert removedActiveDeletes != null && removedActiveDeletes == 1 : removedActiveDeletes; - - final Set r = onGoingSnapshotsDeletions.remove(repository); - assert r.isEmpty() : repository + " has non empty " + r; - - final Queue q = queuesSnapshotsDeletions.remove(repository); - assert q.isEmpty() : repository + " has non empty " + q; - } - assert assertSnapshotsToDeleteInvariants(); } } /** - * A {@link Runnable} used to process the deletion of a snapshot marked as to delete. + * A {@link Runnable} used to process the deletion of snapshots marked as to delete for a given repository. */ - private class SnapshotToDeleteRunnable extends AbstractRunnable { + private class SnapshotsToDeleteRunnable extends AbstractRunnable { - private final SnapshotId snapshotId; - private final String repository; - - private boolean retryLater; + private final Set snapshotIdsToDelete; + private final String repositoryName; - SnapshotToDeleteRunnable(String repository, SnapshotId snapshotId) { - super(); - this.repository = Objects.requireNonNull(repository); - this.snapshotId = Objects.requireNonNull(snapshotId); - this.retryLater = false; + SnapshotsToDeleteRunnable(String repositoryName, Set snapshotIdsToDelete) { + this.repositoryName = Objects.requireNonNull(repositoryName); + this.snapshotIdsToDelete = Objects.requireNonNull(snapshotIdsToDelete); + assert this.snapshotIdsToDelete.isEmpty() == false; } @Override protected void doRun() throws Exception { - try { - logger.debug("[{}] triggering deletion of snapshot [{}]", repository, snapshotId); - final PlainActionFuture future = PlainActionFuture.newFuture(); - deleteSnapshotsByUuid(new DeleteSnapshotRequest(repository, snapshotId.getUUID()), future); - future.actionGet(); - } finally { - removeSnapshotsToDelete(repository, snapshotId); - } - } + final AtomicBoolean shouldRetry = new AtomicBoolean(); + final CountDown countDown = new CountDown(snapshotIdsToDelete.size()); - @Override - public void onFailure(Exception e) { - if (e instanceof SnapshotMissingException) { - logger.debug("[{}] snapshot to delete [{}] is already deleted", repository, snapshotId); + for (SnapshotId snapshotId : snapshotIdsToDelete) { + final ActionListener listener = ActionListener.runBefore(new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.debug("[{}] snapshot to delete [{}] successfully deleted", repositoryName, snapshotId); + retryIfNeeded(); + } - } else if (e instanceof RepositoryMissingException) { - logger.warn( - () -> new ParameterizedMessage("[{}] failed to delete snapshot [{}] (repository removed)", repository, snapshotId), - e - ); + @Override + public void onFailure(Exception e) { + if (e instanceof SnapshotMissingException) { + logger.debug( + () -> new ParameterizedMessage( + "[{}] snapshot to delete [{}] is already deleted or is missing", + repositoryName, + snapshotId + ), + e + ); + } else if (e instanceof RepositoryMissingException) { + logger.warn( + () -> new ParameterizedMessage( + "[{}] failed to delete snapshot [{}]: repository has been removed before snapshot could be deleted, " + + "the snapshot might be leaking", + repositoryName, + snapshotId + ), + e + ); + } else if (e instanceof ConcurrentSnapshotExecutionException) { + logger.debug( + "[{}] failed to delete snapshot [{}]: a concurrent operation is running", + repositoryName, + snapshotId + ); + } else { + logger.warn( + () -> new ParameterizedMessage("[{}] failed to delete snapshot [{}]", repositoryName, snapshotId), + e + ); + shouldRetry.set(true); + } + retryIfNeeded(); + } - } else if (e instanceof ConcurrentSnapshotExecutionException) { - logger.debug( - "[{}] failed to delete snapshot [{}] (concurrent operation running, retrying in {})", - repository, - snapshotId, - snapshotsToDeleteRetryInterval.getStringRep() - ); - retryLater = true; + void retryIfNeeded() { + if (countDown.countDown() && shouldRetry.get()) { + startDeletionOfSnapshotsToDelete(clusterService.state(), snapshotsToDeleteRetryInterval); + } + } + }, () -> { + final boolean removed = onGoingSnapshotsDeletions.remove(snapshotId); + assert removed : "snapshot to delete [" + snapshotId + "] not found in repository [" + repositoryName + ']'; + }); - } else { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot [{}]", repository, snapshotId), e); + try { + deleteSnapshotsByUuid(new DeleteSnapshotRequest(repositoryName, snapshotId.getUUID()), listener); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshot [{}]", repositoryName, snapshotId), + e + ); + listener.onFailure(e); + } } } @Override - public void onAfter() { - if (retryLater == false) { - processSnapshotsToDelete(repository); - } else { - threadPool.scheduleUnlessShuttingDown( - snapshotsToDeleteRetryInterval, - ThreadPool.Names.GENERIC, - () -> startDeletionOfSnapshotsToDelete(clusterService.state()) - ); - } - } - } - - private boolean assertSnapshotsToDeleteInvariants() { - assert Thread.holdsLock(snapshotToDeleteMutex); - - final Set repositories = new HashSet<>(onGoingSnapshotsDeletions.keySet()); - repositories.addAll(queuesSnapshotsDeletions.keySet()); - repositories.addAll(numberOfOnGoingSnapshotsToDeletes.keySet()); - - for (String repository : repositories) { - if (onGoingSnapshotsDeletions.containsKey(repository) - || queuesSnapshotsDeletions.containsKey(repository) - || numberOfOnGoingSnapshotsToDeletes.containsKey(repository)) { - assert onGoingSnapshotsDeletions.get(repository) != null; - assert queuesSnapshotsDeletions.get(repository) != null; - assert numberOfOnGoingSnapshotsToDeletes.get(repository) != null; - } else { - assert onGoingSnapshotsDeletions.get(repository) == null; - assert queuesSnapshotsDeletions.get(repository) == null; - assert numberOfOnGoingSnapshotsToDeletes.get(repository) == null; - } - } - return true; - } - - /** - * Asserts that a snapshot to delete cannot be included both in {@link RepositoryMetadata} and in {@link SnapshotDeletionsInProgress}. - * - * @param state the current {@link ClusterState} - * @return true if all checks succeed - */ - private static boolean assertSnapshotsToDeleteConsistency(ClusterState state) { - final Map> snapshotsToDelete = new HashMap<>(); - for (RepositoryMetadata repository : state.metadata() - .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) - .repositories()) { - snapshotsToDelete.put(repository.name(), Set.copyOf(repository.snapshotsToDelete())); - } - final Map> onGoingSnapshotDeletions = new HashMap<>(); - for (SnapshotDeletionsInProgress.Entry deletionInProgress : state.custom( - SnapshotDeletionsInProgress.TYPE, - SnapshotDeletionsInProgress.EMPTY - ).getEntries()) { - onGoingSnapshotDeletions.put(deletionInProgress.repository(), Set.copyOf(deletionInProgress.getSnapshots())); - } - for (String repository : Sets.union(snapshotsToDelete.keySet(), onGoingSnapshotDeletions.keySet())) { - Set set1 = snapshotsToDelete.getOrDefault(repository, Set.of()); - Set set2 = onGoingSnapshotDeletions.getOrDefault(repository, Set.of()); - assert Sets.intersection(set1, set2).isEmpty() - : "repository [" - + repository - + "] has snapshots deletions still marked as to delete [SnapshotDeletionsInProgress={" - + onGoingSnapshotDeletions.get(repository) - + "}, RepositoryMetadata={" - + snapshotsToDelete.get(repository) - + "}]"; + public void onFailure(Exception e) { + logger.warn( + () -> new ParameterizedMessage("[{}] failed to trigger deletion of snapshots {}", repositoryName, snapshotIdsToDelete), + e + ); + onGoingSnapshotsDeletions.removeAll(snapshotIdsToDelete); } - return true; } private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( @@ -2455,11 +2386,7 @@ public ClusterState execute(ClusterState currentState) { return currentState; } - final Set activeCloneSources = snapshotsInProgress.entries() - .stream() - .filter(SnapshotsInProgress.Entry::isClone) - .map(SnapshotsInProgress.Entry::source) - .collect(Collectors.toSet()); + final Set activeCloneSources = listOfCloneSources(currentState); for (SnapshotId snapshotId : snapshotIds) { if (activeCloneSources.contains(snapshotId)) { throw new ConcurrentSnapshotExecutionException( @@ -2476,11 +2403,6 @@ public ClusterState execute(ClusterState currentState) { "delete snapshot" ); - final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( - SnapshotDeletionsInProgress.TYPE, - SnapshotDeletionsInProgress.EMPTY - ); - final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); // don't allow snapshot deletions while a restore is taking place, // otherwise we could end up deleting a snapshot that is being restored @@ -2520,7 +2442,12 @@ public ClusterState execute(ClusterState currentState) { // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions return updateWithSnapshots(currentState, updatedSnapshots, null); } + // add the snapshot deletion to the cluster state + final SnapshotDeletionsInProgress deletionsInProgress = currentState.custom( + SnapshotDeletionsInProgress.TYPE, + SnapshotDeletionsInProgress.EMPTY + ); final SnapshotDeletionsInProgress.Entry replacedEntry = deletionsInProgress.getEntries() .stream() .filter(entry -> entry.repository().equals(repositoryName)) @@ -2777,6 +2704,27 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres return updatedDeletions == null ? deletions : updatedDeletions; } + @Override + protected ClusterState updateSnapshotsToDelete(ClusterState state) { + final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE); + if (repositories == null) { + return state; + } + final RepositoryMetadata repository = repositories.repository(deleteEntry.repository()); + if (repository == null || repository.hasSnapshotsToDelete() == false) { + return state; + } + return ClusterState.builder(state) + .metadata( + Metadata.builder(state.metadata()) + .putCustom( + RepositoriesMetadata.TYPE, + repositories.removeSnapshotsToDelete(repository.name(), deleteEntry.getSnapshots()) + ) + ) + .build(); + } + @Override protected void handleListeners(List> deleteListeners) { assert repositoryData.getSnapshotIds().stream().noneMatch(deleteEntry.getSnapshots()::contains) @@ -2863,7 +2811,9 @@ public ClusterState execute(ClusterState currentState) { } final SnapshotDeletionsInProgress newDeletions = filterDeletions(updatedDeletions); final Tuple> res = readyDeletions( - updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions) + updateSnapshotsToDelete( + updateWithSnapshots(currentState, updatedSnapshotsInProgress(currentState, newDeletions), newDeletions) + ) ); readyDeletions = res.v2(); return res.v1(); @@ -2880,6 +2830,10 @@ protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgres return deletions; } + protected ClusterState updateSnapshotsToDelete(ClusterState currentState) { + return currentState; + } + @Override public final void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final List> deleteListeners; @@ -3058,8 +3012,7 @@ private void markShardReassigned(RepositoryShardId shardId, Set restoreFuture = client().admin() .cluster() @@ -710,7 +714,7 @@ public void testSearchableSnapshotIsDeletedWithOnGoingRestore() throws Exception awaitClusterState(state -> state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).isEmpty() == false); - final ActionFuture deleteFuture = client().admin().indices().prepareDelete(mounted).execute(); + final ActionFuture deleteIndexFuture = client().admin().indices().prepareDelete(mounted).execute(); awaitClusterState( state -> state.metadata() @@ -725,11 +729,135 @@ public void testSearchableSnapshotIsDeletedWithOnGoingRestore() throws Exception final RestoreInfo restoreInfoResponse = restoreFuture.actionGet().getRestoreInfo(); assertThat(restoreInfoResponse.successfulShards(), equalTo(numShards.numPrimaries)); assertThat(restoreInfoResponse.failedShards(), equalTo(0)); - assertAcked(deleteFuture.get()); + assertAcked(deleteIndexFuture.get()); expectThrows(SnapshotMissingException.class, () -> clusterAdmin().prepareDeleteSnapshot(repository, snapshot).get()); } + public void testSearchableSnapshotIsDeletedWithOnGoingClone() throws Exception { + final String repository = "repository-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(logger, repository, "mock"); + + final String index = "index"; + assertAcked(prepareCreate(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); + ensureGreen(index); + populateIndex(index, scaledRandomIntBetween(0, 5_000)); + refresh(index); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repository, sourceSnapshot); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = "mounted-" + index; + mountSnapshot(repository, sourceSnapshot, index, mounted, deleteSnapshotIndexSettings(true), randomFrom(Storage.values())); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + + final IndexId indexId = getRepositoryData(repository).resolveIndexId(index); + blockMasterOnShardLevelSnapshotFile(repository, indexId.getId()); + + final String cloneTarget = "target-snapshot"; + final ActionFuture cloneSnapshot = clusterAdmin().prepareCloneSnapshot( + repository, + sourceSnapshot, + cloneTarget + ).setIndices(index).execute(); + awaitNumberOfSnapshotsInProgress(1); + + final String masterNode = internalCluster().getMasterName(); + waitForBlock(masterNode, repository); + + final ActionFuture deleteIndex = client().admin().indices().prepareDelete(mounted).execute(); + awaitClusterState( + state -> state.metadata() + .custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY) + .repository(repository) + .hasSnapshotsToDelete() + ); + + assertFalse(cloneSnapshot.isDone()); + unblockNode(repository, masterNode); + awaitNoMoreSnapshotsDeletions(); + + assertAcked(cloneSnapshot.get()); + assertAcked(deleteIndex.get()); + + assertThat(clusterAdmin().prepareSnapshotStatus().setRepository(repository).get().getSnapshots(), hasSize(0)); + } + + public void testSearchableSnapshotDeletedFromReadOnlyRepository() throws Exception { + final String repository = "repository-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final Settings.Builder repositorySettings = randomRepositorySettings(); + createRepository(repository, "mock", repositorySettings); + + final String index = "index"; + assertAcked(prepareCreate(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); + ensureGreen(index); + populateIndex(index, scaledRandomIntBetween(10, 5_000)); + refresh(index); + + final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repository, sourceSnapshot); + assertAcked(client().admin().indices().prepareDelete(index)); + + final String mounted = "mounted-" + index; + mountSnapshot(repository, sourceSnapshot, index, mounted, deleteSnapshotIndexSettings(true), randomFrom(Storage.values())); + assertHitCount(client().prepareSearch(mounted).setTrackTotalHits(true).get(), totalHits.value); + + logger.info("--> updating repository [{}] to be readonly", repository); + assertAcked( + client().admin() + .cluster() + .preparePutRepository(repository) + .setVerify(randomBoolean()) + .setType(FsRepository.TYPE) + .setSettings(repositorySettings.put(BlobStoreRepository.READONLY_SETTING_KEY, true)) + ); + + logger.info("--> deleting snapshot backed index [{}]", mounted); + assertAcked(client().admin().indices().prepareDelete(mounted)); + + awaitClusterState(state -> { + final RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY); + final RepositoryMetadata repositoryMetadata = repositories.repository(repository); + return repositoryMetadata != null + && repositoryMetadata.hasSnapshotsToDelete() + && repositoryMetadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, false); + }); + + logger.info("--> updating repository [{}] to be writeable again", repository); + assertBusy(() -> { + try { + AcknowledgedResponse response = client().admin() + .cluster() + .preparePutRepository(repository) + .setVerify(randomBoolean()) + .setType(FsRepository.TYPE) + .setSettings(repositorySettings.putNull(BlobStoreRepository.READONLY_SETTING_KEY)) + .get(); + assertAcked(response); + } catch (IllegalStateException e) { + assertThat( + e.getMessage(), + containsString( + "trying to modify or unregister repository [" + + repository + + "] that is currently used (snapshot deletion is in progress)" + ) + ); + } + }); + + awaitNoMoreSnapshotsDeletions(); + expectThrows( + SnapshotMissingException.class, + () -> clusterAdmin().prepareSnapshotStatus().setRepository(repository).setSnapshots(sourceSnapshot).get() + ); + } + private static Settings deleteSnapshotIndexSettings(boolean value) { return Settings.builder().put(SEARCHABLE_SNAPSHOTS_DELETE_SNAPSHOT_ON_INDEX_DELETION, value).build(); } From 83dfa73b0b61d8c62c1e19bafff38405a7786316 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 14 Sep 2021 12:26:34 +0200 Subject: [PATCH 10/11] fix random int in test --- .../SearchableSnapshotsRepositoryIntegTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java index 3089464ab3307..d4a98866646b2 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsRepositoryIntegTests.java @@ -686,7 +686,7 @@ public void testSearchableSnapshotIsDeletedWithOnGoingRestore() throws Exception final String index = "index"; assertAcked(prepareCreate(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); ensureGreen(index); - populateIndex(index, scaledRandomIntBetween(0, 5_000)); + populateIndex(index, scaledRandomIntBetween(10, 5_000)); refresh(index); final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); @@ -741,7 +741,7 @@ public void testSearchableSnapshotIsDeletedWithOnGoingClone() throws Exception { final String index = "index"; assertAcked(prepareCreate(index, Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true))); ensureGreen(index); - populateIndex(index, scaledRandomIntBetween(0, 5_000)); + populateIndex(index, scaledRandomIntBetween(10, 5_000)); refresh(index); final TotalHits totalHits = internalCluster().client().prepareSearch(index).setTrackTotalHits(true).get().getHits().getTotalHits(); From b1f36f8ac9b465fd3eb0da13f715af33b0636fbc Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 2 Feb 2022 21:49:19 +0100 Subject: [PATCH 11/11] Update docs/changelog/75565.yaml --- docs/changelog/75565.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/75565.yaml diff --git a/docs/changelog/75565.yaml b/docs/changelog/75565.yaml new file mode 100644 index 0000000000000..9dc43c4fee361 --- /dev/null +++ b/docs/changelog/75565.yaml @@ -0,0 +1,5 @@ +pr: 75565 +summary: Delete backing snapshot when searchable snapshot index is deleted +area: Snapshot/Restore +type: enhancement +issues: []