Skip to content

Commit 32776bc

Browse files
authored
Also reroute after shard snapshot size fetch failure (#66006)
In #61906 we added the possibility for the master node to fetch the size of a shard snapshot before allocating the shard to a data node with enough disk space to host it. When merging this change we agreed that any failure during size fetching should not prevent the shard to be allocated. Sadly it does not work as expected: the service only triggers reroutes when fetching the size succeed but never when it fails. It means that a shard might stay unassigned until another cluster state update triggers a new allocation (as in #64372). More sadly, the test I wrote was wrong as it explicitly triggered a reroute. This commit changes the InternalSnapshotsInfoService so that it also triggers a reroute when fetching the snapshot shard size failed, ensuring that the allocation can move forward by using an UNAVAILABLE_EXPECTED_SHARD_SIZE shard size. This unknown shard size is kept around in the snapshot info service until no corresponding unassigned shards need the information. Backport of #65436 for 7.11
1 parent 7995c79 commit 32776bc

File tree

4 files changed

+95
-97
lines changed

4 files changed

+95
-97
lines changed

server/src/main/java/org/elasticsearch/snapshots/InternalSnapshotsInfoService.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class InternalSnapshotsInfoService implements ClusterStateListener, Snaps
7272
private final Supplier<RerouteService> rerouteService;
7373

7474
/** contains the snapshot shards for which the size is known **/
75-
private volatile ImmutableOpenMap<SnapshotShard, Long> knownSnapshotShardSizes;
75+
private volatile ImmutableOpenMap<SnapshotShard, Long> knownSnapshotShards;
7676

7777
private volatile boolean isMaster;
7878

@@ -99,7 +99,7 @@ public InternalSnapshotsInfoService(
9999
this.threadPool = clusterService.getClusterApplierService().threadPool();
100100
this.repositoriesService = repositoriesServiceSupplier;
101101
this.rerouteService = rerouteServiceSupplier;
102-
this.knownSnapshotShardSizes = ImmutableOpenMap.of();
102+
this.knownSnapshotShards = ImmutableOpenMap.of();
103103
this.unknownSnapshotShards = new LinkedHashSet<>();
104104
this.failedSnapshotShards = new LinkedHashSet<>();
105105
this.queue = new LinkedList<>();
@@ -120,10 +120,12 @@ private void setMaxConcurrentFetches(Integer maxConcurrentFetches) {
120120
@Override
121121
public SnapshotShardSizeInfo snapshotShardSizes() {
122122
synchronized (mutex){
123-
final ImmutableOpenMap.Builder<SnapshotShard, Long> snapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShardSizes);
124-
for (SnapshotShard snapshotShard : failedSnapshotShards) {
125-
Long previous = snapshotShardSizes.put(snapshotShard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
126-
assert previous == null : "snapshot shard size already known for " + snapshotShard;
123+
final ImmutableOpenMap.Builder<SnapshotShard, Long> snapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShards);
124+
if (failedSnapshotShards.isEmpty() == false) {
125+
for (SnapshotShard snapshotShard : failedSnapshotShards) {
126+
Long previous = snapshotShardSizes.put(snapshotShard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
127+
assert previous == null : "snapshot shard size already known for " + snapshotShard;
128+
}
127129
}
128130
return new SnapshotShardSizeInfo(snapshotShardSizes.build());
129131
}
@@ -139,10 +141,9 @@ public void clusterChanged(ClusterChangedEvent event) {
139141
isMaster = true;
140142
for (SnapshotShard snapshotShard : onGoingSnapshotRecoveries) {
141143
// check if already populated entry
142-
if (knownSnapshotShardSizes.containsKey(snapshotShard) == false) {
144+
if (knownSnapshotShards.containsKey(snapshotShard) == false && failedSnapshotShards.contains(snapshotShard) == false) {
143145
// check if already fetching snapshot info in progress
144146
if (unknownSnapshotShards.add(snapshotShard)) {
145-
failedSnapshotShards.remove(snapshotShard); // retry the failed shard
146147
queue.add(snapshotShard);
147148
unknownShards += 1;
148149
}
@@ -162,7 +163,7 @@ public void clusterChanged(ClusterChangedEvent event) {
162163
// have to repopulate the data over and over in an unstable master situation?
163164
synchronized (mutex) {
164165
// information only needed on current master
165-
knownSnapshotShardSizes = ImmutableOpenMap.of();
166+
knownSnapshotShards = ImmutableOpenMap.of();
166167
failedSnapshotShards.clear();
167168
isMaster = false;
168169
SnapshotShard snapshotShard;
@@ -175,7 +176,7 @@ public void clusterChanged(ClusterChangedEvent event) {
175176
} else {
176177
synchronized (mutex) {
177178
assert unknownSnapshotShards.isEmpty() || unknownSnapshotShards.size() == activeFetches;
178-
assert knownSnapshotShardSizes.isEmpty();
179+
assert knownSnapshotShards.isEmpty();
179180
assert failedSnapshotShards.isEmpty();
180181
assert isMaster == false;
181182
assert queue.isEmpty();
@@ -228,10 +229,10 @@ protected void doRun() throws Exception {
228229
assert removed : "snapshot shard to remove does not exist " + snapshotShardSize;
229230
if (isMaster) {
230231
final ImmutableOpenMap.Builder<SnapshotShard, Long> newSnapshotShardSizes =
231-
ImmutableOpenMap.builder(knownSnapshotShardSizes);
232+
ImmutableOpenMap.builder(knownSnapshotShards);
232233
updated = newSnapshotShardSizes.put(snapshotShard, snapshotShardSize) == null;
233234
assert updated : "snapshot shard size already exists for " + snapshotShard;
234-
knownSnapshotShardSizes = newSnapshotShardSizes.build();
235+
knownSnapshotShards = newSnapshotShardSizes.build();
235236
}
236237
activeFetches -= 1;
237238
assert invariant();
@@ -244,17 +245,21 @@ protected void doRun() throws Exception {
244245
@Override
245246
public void onFailure(Exception e) {
246247
logger.warn(() -> new ParameterizedMessage("failed to retrieve shard size for {}", snapshotShard), e);
248+
boolean failed = false;
247249
synchronized (mutex) {
248250
if (isMaster) {
249-
final boolean added = failedSnapshotShards.add(snapshotShard);
250-
assert added : "snapshot shard size already failed for " + snapshotShard;
251+
failed = failedSnapshotShards.add(snapshotShard);
252+
assert failed : "snapshot shard size already failed for " + snapshotShard;
251253
}
252254
if (removed == false) {
253255
unknownSnapshotShards.remove(snapshotShard);
254256
}
255257
activeFetches -= 1;
256258
assert invariant();
257259
}
260+
if (failed) {
261+
rerouteService.get().reroute("snapshot shard size failed", Priority.HIGH, REROUTE_LISTENER);
262+
}
258263
}
259264

260265
@Override
@@ -266,16 +271,16 @@ public void onAfter() {
266271
private void cleanUpSnapshotShardSizes(Set<SnapshotShard> requiredSnapshotShards) {
267272
assert Thread.holdsLock(mutex);
268273
ImmutableOpenMap.Builder<SnapshotShard, Long> newSnapshotShardSizes = null;
269-
for (ObjectCursor<SnapshotShard> shard : knownSnapshotShardSizes.keys()) {
274+
for (ObjectCursor<SnapshotShard> shard : knownSnapshotShards.keys()) {
270275
if (requiredSnapshotShards.contains(shard.value) == false) {
271276
if (newSnapshotShardSizes == null) {
272-
newSnapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShardSizes);
277+
newSnapshotShardSizes = ImmutableOpenMap.builder(knownSnapshotShards);
273278
}
274279
newSnapshotShardSizes.remove(shard.value);
275280
}
276281
}
277282
if (newSnapshotShardSizes != null) {
278-
knownSnapshotShardSizes = newSnapshotShardSizes.build();
283+
knownSnapshotShards = newSnapshotShardSizes.build();
279284
}
280285
failedSnapshotShards.retainAll(requiredSnapshotShards);
281286
}
@@ -284,16 +289,16 @@ private boolean invariant() {
284289
assert Thread.holdsLock(mutex);
285290
assert activeFetches >= 0 : "active fetches should be greater than or equal to zero but got: " + activeFetches;
286291
assert activeFetches <= maxConcurrentFetches : activeFetches + " <= " + maxConcurrentFetches;
287-
for (ObjectCursor<SnapshotShard> cursor : knownSnapshotShardSizes.keys()) {
292+
for (ObjectCursor<SnapshotShard> cursor : knownSnapshotShards.keys()) {
288293
assert unknownSnapshotShards.contains(cursor.value) == false : "cannot be known and unknown at same time: " + cursor.value;
289294
assert failedSnapshotShards.contains(cursor.value) == false : "cannot be known and failed at same time: " + cursor.value;
290295
}
291296
for (SnapshotShard shard : unknownSnapshotShards) {
292-
assert knownSnapshotShardSizes.keys().contains(shard) == false : "cannot be unknown and known at same time: " + shard;
297+
assert knownSnapshotShards.keys().contains(shard) == false : "cannot be unknown and known at same time: " + shard;
293298
assert failedSnapshotShards.contains(shard) == false : "cannot be unknown and failed at same time: " + shard;
294299
}
295300
for (SnapshotShard shard : failedSnapshotShards) {
296-
assert knownSnapshotShardSizes.keys().contains(shard) == false : "cannot be failed and known at same time: " + shard;
301+
assert knownSnapshotShards.keys().contains(shard) == false : "cannot be failed and known at same time: " + shard;
297302
assert unknownSnapshotShards.contains(shard) == false : "cannot be failed and unknown at same time: " + shard;
298303
}
299304
return true;
@@ -315,7 +320,7 @@ int numberOfFailedSnapshotShardSizes() {
315320

316321
// used in tests
317322
int numberOfKnownSnapshotShardSizes() {
318-
return knownSnapshotShardSizes.size();
323+
return knownSnapshotShards.size();
319324
}
320325

321326
private static Set<SnapshotShard> listOfSnapshotShards(final ClusterState state) {

server/src/test/java/org/elasticsearch/snapshots/InternalSnapshotsInfoServiceTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
179179
}
180180

181181
public void testErroneousSnapshotShardSizes() throws Exception {
182+
final AtomicInteger reroutes = new AtomicInteger();
183+
final RerouteService rerouteService = (reason, priority, listener) -> {
184+
reroutes.incrementAndGet();
185+
listener.onResponse(clusterService.state());
186+
};
187+
182188
final InternalSnapshotsInfoService snapshotsInfoService =
183189
new InternalSnapshotsInfoService(Settings.builder()
184190
.put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10))
@@ -244,6 +250,9 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
244250
assertThat(snapshotShardSizeInfo.getShardSize(shardRouting, defaultValue),
245251
success ? equalTo(results.get(snapshotShard.getKey())) : equalTo(defaultValue));
246252
}
253+
254+
assertThat("Expecting all snapshot shard size fetches to provide a size", results.size(), equalTo(maxShardsToCreate));
255+
assertThat("Expecting all snapshot shard size fetches to execute a Reroute", reroutes.get(), equalTo(maxShardsToCreate));
247256
}
248257

249258
public void testNoLongerMaster() throws Exception {

0 commit comments

Comments
 (0)