Skip to content

Commit 39d0d7b

Browse files
benwtrentAdam Locke
authored andcommitted
[ML] wait for .ml-state-write alias to be readable (elastic#79731)
In tests and actual usage, it is possible that one job creates the .ml-state-write and another starts immediately afterwards, sees that the index is created, and moves on. But, what this means, is that the second job could blast past the check and the job starts/stops/etc. all with the .ml-state-write alias pointing to an index that is not even readable. This commit waits for the index to be yellow before continuing opening the job. closes: elastic#79636
1 parent 17cd41d commit 39d0d7b

File tree

5 files changed

+92
-12
lines changed

5 files changed

+92
-12
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/annotations/AnnotationIndex.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
import org.elasticsearch.ResourceAlreadyExistsException;
1313
import org.elasticsearch.Version;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
1516
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
16-
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
1717
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1818
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
1919
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -68,10 +68,13 @@ public static void createAnnotationsIndexIfNecessaryAndWaitForYellow(Client clie
6868
final ClusterHealthRequest request = Requests.clusterHealthRequest(READ_ALIAS_NAME)
6969
.waitForYellowStatus()
7070
.masterNodeTimeout(masterNodeTimeout);
71-
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request,
72-
ActionListener.<ClusterHealthResponse>wrap(
73-
r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure),
74-
client.admin().cluster()::health);
71+
executeAsyncWithOrigin(
72+
client,
73+
ML_ORIGIN,
74+
ClusterHealthAction.INSTANCE,
75+
request,
76+
ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure)
77+
);
7578
}, finalListener::onFailure);
7679

7780
createAnnotationsIndexIfNecessary(client, state, masterNodeTimeout, annotationsIndexCreatedListener);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/AnomalyDetectorsIndex.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,19 @@
88

99
import org.elasticsearch.Version;
1010
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
12+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
1113
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.client.Requests;
1215
import org.elasticsearch.cluster.ClusterState;
1316
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1417
import org.elasticsearch.core.TimeValue;
1518
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
1619
import org.elasticsearch.xpack.core.template.TemplateUtils;
1720

21+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
22+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
23+
1824
/**
1925
* Methods for handling index naming related functions
2026
*/
@@ -83,6 +89,35 @@ public static void createStateIndexAndAliasIfNecessary(Client client, ClusterSta
8389
finalListener);
8490
}
8591

92+
public static void createStateIndexAndAliasIfNecessaryAndWaitForYellow(Client client,
93+
ClusterState state,
94+
IndexNameExpressionResolver resolver,
95+
TimeValue masterNodeTimeout,
96+
final ActionListener<Boolean> finalListener) {
97+
final ActionListener<Boolean> stateIndexAndAliasCreated = ActionListener.wrap(success -> {
98+
final ClusterHealthRequest request = Requests.clusterHealthRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias())
99+
.waitForYellowStatus()
100+
.masterNodeTimeout(masterNodeTimeout);
101+
executeAsyncWithOrigin(
102+
client,
103+
ML_ORIGIN,
104+
ClusterHealthAction.INSTANCE,
105+
request,
106+
ActionListener.wrap(r -> finalListener.onResponse(r.isTimedOut() == false), finalListener::onFailure)
107+
);
108+
}, finalListener::onFailure);
109+
110+
MlIndexAndAlias.createIndexAndAliasIfNecessary(
111+
client,
112+
state,
113+
resolver,
114+
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
115+
AnomalyDetectorsIndex.jobStateIndexWriteAlias(),
116+
masterNodeTimeout,
117+
stateIndexAndAliasCreated
118+
);
119+
}
120+
86121
public static String wrappedResultsMapping() {
87122
return "{\n\"_doc\" : " + resultsMapping() + "\n}";
88123
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DataFrameAnalyticsManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,13 @@ public void execute(DataFrameAnalyticsTask task, ClusterState clusterState, Time
130130
);
131131

132132
// Make sure the state index and alias exist
133-
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(new ParentTaskAssigningClient(client, task.getParentTaskId()),
134-
clusterState, expressionResolver, masterNodeTimeout, stateAliasListener);
133+
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow(
134+
new ParentTaskAssigningClient(client, task.getParentTaskId()),
135+
clusterState,
136+
expressionResolver,
137+
masterNodeTimeout,
138+
stateAliasListener
139+
);
135140
}
136141

137142
private void createStatsIndexAndUpdateMappingsIfNecessary(Client client, ClusterState clusterState, TimeValue masterNodeTimeout,

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,11 +525,19 @@ public void openJob(JobTask jobTask, ClusterState clusterState, TimeValue master
525525
}
526526
);
527527

528-
// Make sure the state index and alias exist
528+
// Make sure the state index and alias exist and are writeable
529529
ActionListener<Boolean> resultsMappingUpdateHandler = ActionListener.wrap(
530-
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, masterNodeTimeout,
531-
stateAliasHandler),
532-
e -> closeHandler.accept(e, true)
530+
ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessaryAndWaitForYellow(
531+
client,
532+
clusterState,
533+
expressionResolver,
534+
masterNodeTimeout,
535+
stateAliasHandler
536+
),
537+
e -> {
538+
logger.error(new ParameterizedMessage("[{}] ML state index alias could not be updated", jobId), e);
539+
closeHandler.accept(e, true);
540+
}
533541
);
534542

535543
// Try adding the results doc mapping - this updates to the latest version if an old mapping is present

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import org.elasticsearch.ElasticsearchException;
1010
import org.elasticsearch.Version;
1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
14+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1215
import org.elasticsearch.client.Client;
1316
import org.elasticsearch.cluster.ClusterState;
1417
import org.elasticsearch.cluster.metadata.AliasMetadata;
@@ -22,6 +25,7 @@
2225
import org.elasticsearch.common.util.concurrent.EsExecutors;
2326
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2427
import org.elasticsearch.common.util.concurrent.ThreadContext;
28+
import org.elasticsearch.core.TimeValue;
2529
import org.elasticsearch.xcontent.NamedXContentRegistry;
2630
import org.elasticsearch.xcontent.XContentType;
2731
import org.elasticsearch.env.Environment;
@@ -72,6 +76,8 @@
7276
import java.io.ByteArrayInputStream;
7377
import java.io.IOException;
7478
import java.io.InputStream;
79+
import java.lang.reflect.ParameterizedType;
80+
import java.lang.reflect.Type;
7581
import java.nio.charset.StandardCharsets;
7682
import java.util.Arrays;
7783
import java.util.Collections;
@@ -146,13 +152,36 @@ public class AutodetectProcessManagerTests extends ESTestCase {
146152
private Quantiles quantiles = new Quantiles("foo", new Date(), "state");
147153

148154
@Before
155+
@SuppressWarnings("unchecked")
149156
public void setup() throws Exception {
150157
Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
151158
client = mock(Client.class);
152-
153159
threadPool = mock(ThreadPool.class);
154160
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
155161
when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
162+
when(client.threadPool()).thenReturn(threadPool);
163+
doAnswer(invocationOnMock -> {
164+
if (invocationOnMock.getArguments()[0] instanceof ActionType<?>) {
165+
ActionType<?> v = (ActionType<?>) invocationOnMock.getArguments()[0];
166+
ActionListener<?> l = (ActionListener<?>) invocationOnMock.getArguments()[2];
167+
ParameterizedType parameterizedType = (ParameterizedType) v.getClass().getGenericSuperclass();
168+
Type t = parameterizedType.getActualTypeArguments()[0];
169+
if (t.getTypeName().contains("AcknowledgedResponse")) {
170+
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) l;
171+
listener.onResponse(AcknowledgedResponse.TRUE);
172+
return null;
173+
}
174+
if (t.getTypeName().contains("ClusterHealthResponse")) {
175+
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) l;
176+
listener.onResponse(
177+
new ClusterHealthResponse("test", new String[0], ClusterState.EMPTY_STATE, 0, 0, 0, TimeValue.ZERO, false)
178+
);
179+
return null;
180+
}
181+
fail("Mock not configured to handle generic type " + t.getTypeName());
182+
}
183+
return null;
184+
}).when(client).execute(any(), any(), any());
156185

157186
analysisRegistry = CategorizationAnalyzerTests.buildTestAnalysisRegistry(TestEnvironment.newEnvironment(settings));
158187
jobManager = mock(JobManager.class);

0 commit comments

Comments
 (0)