Skip to content

Commit 910deb3

Browse files
committed
Retry ILM steps that fail due to SnapshotInProgressException (#37624)
Some steps, such as steps that delete, close, or freeze an index, may fail due to a currently running snapshot of the index. In those cases, rather than move to the ERROR step, we should retry the step when the snapshot has completed. This change adds an abstract step (`AsyncRetryDuringSnapshotActionStep`) that certain steps (like the ones I mentioned above) can extend that will automatically handle a situation where a snapshot is taking place. When a `SnapshotInProgressException` is received by the listener wrapper, a `ClusterStateObserver` listener is registered to wait until the snapshot has completed, re-running the ILM action when no snapshot is occurring. This also adds integration tests for these scenarios (thanks to @talevy in #37552). Resolves #37541
1 parent 17ffdd4 commit 910deb3

35 files changed

+593
-83
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AbstractUnfollowIndexStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.client.Client;
99
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.ClusterStateObserver;
1011
import org.elasticsearch.cluster.metadata.IndexMetaData;
1112

1213
import java.util.Map;
@@ -20,7 +21,8 @@ abstract class AbstractUnfollowIndexStep extends AsyncActionStep {
2021
}
2122

2223
@Override
23-
public final void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) {
24+
public final void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
25+
ClusterStateObserver observer, Listener listener) {
2426
String followerIndex = indexMetaData.getIndex().getName();
2527
Map<String, String> customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY);
2628
if (customIndexMetadata == null) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/AsyncActionStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.client.Client;
99
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.ClusterStateObserver;
1011
import org.elasticsearch.cluster.metadata.IndexMetaData;
1112

1213
/**
@@ -29,7 +30,8 @@ public boolean indexSurvives() {
2930
return true;
3031
}
3132

32-
public abstract void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener);
33+
public abstract void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
34+
ClusterStateObserver observer, Listener listener);
3335

3436
public interface Listener {
3537

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.core.indexlifecycle;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.ClusterStateObserver;
14+
import org.elasticsearch.cluster.SnapshotsInProgress;
15+
import org.elasticsearch.cluster.metadata.IndexMetaData;
16+
import org.elasticsearch.common.unit.TimeValue;
17+
import org.elasticsearch.index.Index;
18+
import org.elasticsearch.repositories.IndexId;
19+
import org.elasticsearch.snapshots.SnapshotInProgressException;
20+
21+
import java.util.function.Consumer;
22+
23+
/**
24+
* This is an abstract AsyncActionStep that wraps the performed action listener, checking to see
25+
* if the action fails due to a snapshot being in progress. If a snapshot is in progress, it
26+
* registers an observer and waits to try again when a snapshot is no longer running.
27+
*/
28+
public abstract class AsyncRetryDuringSnapshotActionStep extends AsyncActionStep {
29+
private final Logger logger = LogManager.getLogger(AsyncRetryDuringSnapshotActionStep.class);
30+
31+
public AsyncRetryDuringSnapshotActionStep(StepKey key, StepKey nextStepKey, Client client) {
32+
super(key, nextStepKey, client);
33+
}
34+
35+
@Override
36+
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
37+
ClusterStateObserver observer, Listener listener) {
38+
// Wrap the original listener to handle exceptions caused by ongoing snapshots
39+
SnapshotExceptionListener snapshotExceptionListener = new SnapshotExceptionListener(indexMetaData.getIndex(), listener, observer);
40+
performDuringNoSnapshot(indexMetaData, currentClusterState, snapshotExceptionListener);
41+
}
42+
43+
/**
44+
* Method to be performed during which no snapshots for the index are already underway.
45+
*/
46+
abstract void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener);
47+
48+
/**
49+
* SnapshotExceptionListener is an injected listener wrapper that checks to see if a particular
50+
* action failed due to a {@code SnapshotInProgressException}. If it did, then it registers a
51+
* ClusterStateObserver listener waiting for the next time the snapshot is not running,
52+
* re-running the step's {@link #performAction(IndexMetaData, ClusterState, ClusterStateObserver, Listener)}
53+
* method when the snapshot is no longer running.
54+
*/
55+
class SnapshotExceptionListener implements AsyncActionStep.Listener {
56+
private final Index index;
57+
private final Listener originalListener;
58+
private final ClusterStateObserver observer;
59+
60+
SnapshotExceptionListener(Index index, Listener originalListener, ClusterStateObserver observer) {
61+
this.index = index;
62+
this.originalListener = originalListener;
63+
this.observer = observer;
64+
}
65+
66+
@Override
67+
public void onResponse(boolean complete) {
68+
originalListener.onResponse(complete);
69+
}
70+
71+
@Override
72+
public void onFailure(Exception e) {
73+
if (e instanceof SnapshotInProgressException) {
74+
try {
75+
logger.debug("[{}] attempted to run ILM step but a snapshot is in progress, step will retry at a later time",
76+
index.getName());
77+
observer.waitForNextChange(
78+
new NoSnapshotRunningListener(observer, index.getName(), state -> {
79+
IndexMetaData idxMeta = state.metaData().index(index);
80+
if (idxMeta == null) {
81+
// The index has since been deleted, mission accomplished!
82+
originalListener.onResponse(true);
83+
}
84+
// Re-invoke the performAction method with the new state
85+
performAction(idxMeta, state, observer, originalListener);
86+
}, originalListener::onFailure),
87+
// TODO: what is a good timeout value for no new state received during this time?
88+
TimeValue.timeValueHours(12));
89+
} catch (Exception secondError) {
90+
// There was a second error trying to set up an observer,
91+
// fail the original listener
92+
secondError.addSuppressed(e);
93+
originalListener.onFailure(secondError);
94+
}
95+
} else {
96+
originalListener.onFailure(e);
97+
}
98+
}
99+
}
100+
101+
/**
102+
* A {@link ClusterStateObserver.Listener} that invokes the given function with the new state,
103+
* once no snapshots are running. If a snapshot is still running it registers a new listener
104+
* and tries again. Passes any exceptions to the original exception listener if they occur.
105+
*/
106+
class NoSnapshotRunningListener implements ClusterStateObserver.Listener {
107+
108+
private final Consumer<ClusterState> reRun;
109+
private final Consumer<Exception> exceptionConsumer;
110+
private final ClusterStateObserver observer;
111+
private final String indexName;
112+
113+
NoSnapshotRunningListener(ClusterStateObserver observer, String indexName,
114+
Consumer<ClusterState> reRun,
115+
Consumer<Exception> exceptionConsumer) {
116+
this.observer = observer;
117+
this.reRun = reRun;
118+
this.exceptionConsumer = exceptionConsumer;
119+
this.indexName = indexName;
120+
}
121+
122+
@Override
123+
public void onNewClusterState(ClusterState state) {
124+
try {
125+
if (snapshotInProgress(state)) {
126+
observer.waitForNextChange(this);
127+
} else {
128+
logger.debug("[{}] retrying ILM step after snapshot has completed", indexName);
129+
reRun.accept(state);
130+
}
131+
} catch (Exception e) {
132+
exceptionConsumer.accept(e);
133+
}
134+
}
135+
136+
private boolean snapshotInProgress(ClusterState state) {
137+
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
138+
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
139+
// No snapshots are running, new state is acceptable to proceed
140+
return false;
141+
}
142+
143+
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
144+
if (snapshot.indices().stream()
145+
.map(IndexId::getName)
146+
.anyMatch(name -> name.equals(indexName))) {
147+
// There is a snapshot running with this index name
148+
return true;
149+
}
150+
}
151+
// There are snapshots, but none for this index, so it's okay to proceed with this state
152+
return false;
153+
}
154+
155+
@Override
156+
public void onClusterServiceClose() {
157+
// This means the cluster is being shut down, so nothing to do here
158+
}
159+
160+
@Override
161+
public void onTimeout(TimeValue timeout) {
162+
exceptionConsumer.accept(new IllegalStateException("step timed out while waiting for snapshots to complete"));
163+
}
164+
}
165+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/CloseFollowerIndexStep.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,14 @@
88
import org.elasticsearch.action.ActionListener;
99
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
1010
import org.elasticsearch.client.Client;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetaData;
1113

12-
final class CloseFollowerIndexStep extends AbstractUnfollowIndexStep {
14+
import java.util.Map;
15+
16+
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
17+
18+
final class CloseFollowerIndexStep extends AsyncRetryDuringSnapshotActionStep {
1319

1420
static final String NAME = "close-follower-index";
1521

@@ -18,7 +24,14 @@ final class CloseFollowerIndexStep extends AbstractUnfollowIndexStep {
1824
}
1925

2026
@Override
21-
void innerPerformAction(String followerIndex, Listener listener) {
27+
void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) {
28+
String followerIndex = indexMetaData.getIndex().getName();
29+
Map<String, String> customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY);
30+
if (customIndexMetadata == null) {
31+
listener.onResponse(true);
32+
return;
33+
}
34+
2235
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex);
2336
getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap(
2437
r -> {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/DeleteStep.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
/**
1515
* Deletes a single index.
1616
*/
17-
public class DeleteStep extends AsyncActionStep {
17+
public class DeleteStep extends AsyncRetryDuringSnapshotActionStep {
1818
public static final String NAME = "delete";
1919

2020
public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
2121
super(key, nextStepKey, client);
2222
}
2323

2424
@Override
25-
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
25+
public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
2626
getClient().admin().indices()
2727
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()),
28-
ActionListener.wrap(response -> listener.onResponse(true) , listener::onFailure));
28+
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
2929
}
3030

3131
@Override

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/ForceMergeStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
1010
import org.elasticsearch.client.Client;
1111
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.ClusterStateObserver;
1213
import org.elasticsearch.cluster.metadata.IndexMetaData;
1314

1415
import java.util.Objects;
@@ -30,7 +31,7 @@ public int getMaxNumSegments() {
3031
}
3132

3233
@Override
33-
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
34+
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
3435
ForceMergeRequest request = new ForceMergeRequest(indexMetaData.getIndex().getName());
3536
request.maxNumSegments(maxNumSegments);
3637
getClient().admin().indices()

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/FreezeStep.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
/**
1515
* Freezes an index.
1616
*/
17-
public class FreezeStep extends AsyncActionStep {
17+
public class FreezeStep extends AsyncRetryDuringSnapshotActionStep {
1818
public static final String NAME = "freeze";
1919

2020
public FreezeStep(StepKey key, StepKey nextStepKey, Client client) {
2121
super(key, nextStepKey, client);
2222
}
2323

2424
@Override
25-
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
25+
public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
2626
getClient().admin().indices().execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
2727
new TransportFreezeIndexAction.FreezeRequest(indexMetaData.getIndex().getName()),
2828
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/OpenFollowerIndexStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
1010
import org.elasticsearch.client.Client;
1111
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.ClusterStateObserver;
1213
import org.elasticsearch.cluster.metadata.IndexMetaData;
1314

1415
final class OpenFollowerIndexStep extends AsyncActionStep {
@@ -20,7 +21,8 @@ final class OpenFollowerIndexStep extends AsyncActionStep {
2021
}
2122

2223
@Override
23-
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) {
24+
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
25+
ClusterStateObserver observer, Listener listener) {
2426
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName());
2527
getClient().admin().indices().open(request, ActionListener.wrap(
2628
r -> {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/RolloverStep.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
1212
import org.elasticsearch.client.Client;
1313
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.ClusterStateObserver;
1415
import org.elasticsearch.cluster.metadata.IndexMetaData;
1516
import org.elasticsearch.common.Strings;
1617

@@ -30,7 +31,8 @@ public RolloverStep(StepKey key, StepKey nextStepKey, Client client) {
3031
}
3132

3233
@Override
33-
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState, Listener listener) {
34+
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
35+
ClusterStateObserver observer, Listener listener) {
3436
boolean indexingComplete = LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE_SETTING.get(indexMetaData.getSettings());
3537
if (indexingComplete) {
3638
logger.trace(indexMetaData.getIndex() + " has lifecycle complete set, skipping " + RolloverStep.NAME);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexlifecycle/SetSingleNodeAllocateStep.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1010
import org.elasticsearch.client.Client;
1111
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.ClusterStateObserver;
1213
import org.elasticsearch.cluster.metadata.IndexMetaData;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.cluster.routing.RoutingNode;
@@ -42,7 +43,7 @@ public SetSingleNodeAllocateStep(StepKey key, StepKey nextStepKey, Client client
4243
}
4344

4445
@Override
45-
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, Listener listener) {
46+
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
4647
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, clusterState.getRoutingNodes(), clusterState, null,
4748
System.nanoTime());
4849
List<String> validNodeIds = new ArrayList<>();

0 commit comments

Comments
 (0)