Skip to content

Commit cc18caf

Browse files
authored
Ensure error handler is called during SLM retention callback failure (#55252)
When retrieving the snapshots for a set of repos or deleting a single snapshot, it's possible for the body of the `ActionListener`'s `onResponse` method to throw an Exception. In this case, the `errHandler` passed in may not be executed, resulting in the `running` boolean not being reset back to false. This commit uses `ActionListener.wrap(...)` instead of creating a new ActionListener, which ensures that if the `onResponse` fails in any way, the `onFailure` handler is still called. Resolves #55217
1 parent 8638d08 commit cc18caf

File tree

2 files changed

+126
-19
lines changed

2 files changed

+126
-19
lines changed

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.ElasticsearchException;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.action.LatchedActionListener;
15-
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
1615
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1716
import org.elasticsearch.client.Client;
1817
import org.elasticsearch.client.OriginSettingClient;
@@ -252,14 +251,13 @@ void getAllRetainableSnapshots(Collection<String> repositories, ActionListener<M
252251
if (repositories.isEmpty()) {
253252
// Skip retrieving anything if there are no repositories to fetch
254253
listener.onResponse(Collections.emptyMap());
254+
return;
255255
}
256256

257257
client.admin().cluster()
258258
.prepareGetSnapshots(repositories.toArray(Strings.EMPTY_ARRAY))
259259
.setIgnoreUnavailable(true)
260-
.execute(new ActionListener<>() {
261-
@Override
262-
public void onResponse(final GetSnapshotsResponse resp) {
260+
.execute(ActionListener.wrap(resp -> {
263261
if (logger.isTraceEnabled()) {
264262
logger.trace("retrieved snapshots: {}",
265263
repositories.stream()
@@ -276,14 +274,11 @@ public void onResponse(final GetSnapshotsResponse resp) {
276274
.collect(Collectors.toList()));
277275
});
278276
listener.onResponse(snapshots);
279-
}
280-
281-
@Override
282-
public void onFailure(Exception e) {
277+
},
278+
e -> {
283279
logger.debug(new ParameterizedMessage("unable to retrieve snapshots for [{}] repositories", repositories), e);
284280
errorHandler.accept(e);
285-
}
286-
});
281+
}));
287282
}
288283

289284
static String getPolicyId(SnapshotInfo snapshotInfo) {
@@ -424,26 +419,21 @@ void deleteSnapshot(String slmPolicy, String repo, SnapshotId snapshot, Snapshot
424419
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot);
425420
CountDownLatch latch = new CountDownLatch(1);
426421
client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.getName())
427-
.execute(new LatchedActionListener<>(new ActionListener<>() {
428-
@Override
429-
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
422+
.execute(new LatchedActionListener<>(ActionListener.wrap(acknowledgedResponse -> {
430423
if (acknowledgedResponse.isAcknowledged()) {
431424
logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot);
432425
} else {
433426
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot);
434427
}
435428
slmStats.snapshotDeleted(slmPolicy);
436429
listener.onResponse(acknowledgedResponse);
437-
}
438-
439-
@Override
440-
public void onFailure(Exception e) {
430+
},
431+
e -> {
441432
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
442433
repo, snapshot), e);
443434
slmStats.snapshotDeleteFailure(slmPolicy);
444435
listener.onFailure(e);
445-
}
446-
}, latch));
436+
}), latch));
447437
try {
448438
// Deletes cannot occur simultaneously, so wait for this
449439
// deletion to complete before attempting the next one

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66

77
package org.elasticsearch.xpack.slm;
88

9+
import org.elasticsearch.ElasticsearchException;
910
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.ActionRequest;
12+
import org.elasticsearch.action.ActionResponse;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
15+
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
16+
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
1017
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1118
import org.elasticsearch.client.Client;
1219
import org.elasticsearch.cluster.ClusterName;
@@ -45,6 +52,7 @@
4552
import java.util.Arrays;
4653
import java.util.Collection;
4754
import java.util.Collections;
55+
import java.util.HashMap;
4856
import java.util.List;
4957
import java.util.Map;
5058
import java.util.Set;
@@ -53,6 +61,7 @@
5361
import java.util.concurrent.TimeUnit;
5462
import java.util.concurrent.atomic.AtomicBoolean;
5563
import java.util.concurrent.atomic.AtomicLong;
64+
import java.util.concurrent.atomic.AtomicReference;
5665
import java.util.function.Consumer;
5766
import java.util.function.Function;
5867
import java.util.function.LongSupplier;
@@ -375,6 +384,114 @@ public void testOkToDeleteSnapshots() {
375384
assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(true));
376385
}
377386

387+
public void testErrStillRunsFailureHandlerWhenRetrieving() throws Exception {
388+
ThreadPool threadPool = new TestThreadPool("slm-test");
389+
final String policyId = "policy";
390+
final String repoId = "repo";
391+
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
392+
Client noOpClient = new NoOpClient("slm-test") {
393+
394+
@Override
395+
@SuppressWarnings("unchecked")
396+
protected <Request extends ActionRequest, Response extends ActionResponse>
397+
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
398+
if (request instanceof GetSnapshotsRequest) {
399+
logger.info("--> called");
400+
listener.onResponse((Response) new GetSnapshotsResponse(
401+
Collections.singleton(GetSnapshotsResponse.Response.snapshots(repoId, Collections.emptyList()))));
402+
} else {
403+
super.doExecute(action, request, listener);
404+
}
405+
}
406+
}) {
407+
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?",
408+
repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null));
409+
410+
ClusterState state = createState(policy);
411+
ClusterServiceUtils.setState(clusterService, state);
412+
413+
SnapshotRetentionTask task = new SnapshotRetentionTask(noOpClient, clusterService,
414+
System::nanoTime,
415+
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC,
416+
(historyItem) -> fail("should never write history")),
417+
threadPool);
418+
419+
AtomicReference<Exception> errHandlerCalled = new AtomicReference<>(null);
420+
task.getAllRetainableSnapshots(Collections.singleton(repoId), new ActionListener<>() {
421+
@Override
422+
public void onResponse(Map<String, List<SnapshotInfo>> stringListMap) {
423+
logger.info("--> forcing failure");
424+
throw new ElasticsearchException("forced failure");
425+
}
426+
427+
@Override
428+
public void onFailure(Exception e) {
429+
fail("we have another err handler that should have been called");
430+
}
431+
}, errHandlerCalled::set);
432+
433+
assertNotNull(errHandlerCalled.get());
434+
assertThat(errHandlerCalled.get().getMessage(), equalTo("forced failure"));
435+
} finally {
436+
threadPool.shutdownNow();
437+
threadPool.awaitTermination(10, TimeUnit.SECONDS);
438+
}
439+
}
440+
441+
public void testErrStillRunsFailureHandlerWhenDeleting() throws Exception {
442+
ThreadPool threadPool = new TestThreadPool("slm-test");
443+
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
444+
Client noOpClient = new NoOpClient("slm-test") {
445+
446+
@Override
447+
@SuppressWarnings("unchecked")
448+
protected <Request extends ActionRequest, Response extends ActionResponse>
449+
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
450+
if (request instanceof DeleteSnapshotRequest) {
451+
logger.info("--> called");
452+
listener.onResponse((Response) new AcknowledgedResponse(true));
453+
} else {
454+
super.doExecute(action, request, listener);
455+
}
456+
}
457+
}) {
458+
final String policyId = "policy";
459+
final String repoId = "repo";
460+
SnapshotLifecyclePolicy policy = new SnapshotLifecyclePolicy(policyId, "snap", "1 * * * * ?",
461+
repoId, null, new SnapshotRetentionConfiguration(TimeValue.timeValueDays(30), null, null));
462+
463+
ClusterState state = createState(policy);
464+
ClusterServiceUtils.setState(clusterService, state);
465+
466+
SnapshotRetentionTask task = new SnapshotRetentionTask(noOpClient, clusterService,
467+
System::nanoTime,
468+
new SnapshotLifecycleTaskTests.VerifyingHistoryStore(noOpClient, ZoneOffset.UTC,
469+
(historyItem) -> fail("should never write history")),
470+
threadPool);
471+
472+
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
473+
AtomicReference<Exception> errHandlerCalled = new AtomicReference<>(null);
474+
task.deleteSnapshot("policy", "foo", new SnapshotId("name", "uuid"),
475+
new SnapshotLifecycleStats(0, 0, 0, 0, new HashMap<>()), new ActionListener<>() {
476+
@Override
477+
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
478+
logger.info("--> forcing failure");
479+
throw new ElasticsearchException("forced failure");
480+
}
481+
482+
@Override
483+
public void onFailure(Exception e) {
484+
onFailureCalled.set(true);
485+
}
486+
});
487+
488+
assertThat(onFailureCalled.get(), equalTo(true));
489+
} finally {
490+
threadPool.shutdownNow();
491+
threadPool.awaitTermination(10, TimeUnit.SECONDS);
492+
}
493+
}
494+
378495
public void testSkipWhileStopping() throws Exception {
379496
doTestSkipDuringMode(OperationMode.STOPPING);
380497
}

0 commit comments

Comments
 (0)