Skip to content

Commit 547033b

Browse files
authored
The watcher indexing listener didn't handle document level exceptions. (#51466)
Prior to the change the watcher index listener didn't implement the `postIndex(ShardId, Engine.Index, Engine.IndexResult)` method. This caused document level exceptions like VersionConflictEngineException to be ignored. This commit fixes this. The watcher indexing listener did implement the `postIndex(ShardId, Engine.Index, Exception)` method, but that only handles engine level exceptions. This change also unmutes the SmokeTestWatcherTestSuiteIT#testMonitorClusterHealth test again. Relates to #32299
1 parent 0952c21 commit 547033b

File tree

3 files changed

+48
-9
lines changed

3 files changed

+48
-9
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,19 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
135135
}
136136

137137
/**
138-
*
139-
* In case of an error, we have to ensure that the triggerservice does not leave anything behind
138+
* In case of a document related failure (for example version conflict), then clean up resources for a watch
139+
* in the trigger service.
140+
*/
141+
@Override
142+
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
143+
if (result.getResultType() == Engine.Result.Type.FAILURE) {
144+
assert result.getFailure() != null;
145+
postIndex(shardId, index, result.getFailure());
146+
}
147+
}
148+
149+
/**
150+
* In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind
140151
*
141152
* TODO: If the configuration changes between preindex and postindex methods and we add a
142153
* watch, that could not be indexed
@@ -153,10 +164,6 @@ public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
153164
if (isWatchDocument(shardId.getIndexName())) {
154165
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
155166
triggerService.remove(index.id());
156-
} else {
157-
logger.debug(
158-
() -> new ParameterizedMessage("not removing watch [{}] from trigger, because [{}] != [{}] configuration.active=[{}]",
159-
index.id(), shardId.getIndexName(), configuration.index, configuration.active), ex);
160167
}
161168
}
162169

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,15 +200,48 @@ public void testPreIndexCheckParsingException() throws Exception {
200200
assertThat(exc.getMessage(), containsString(id));
201201
}
202202

203-
public void testPostIndexRemoveTriggerOnException() throws Exception {
203+
public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Exception {
204+
when(operation.id()).thenReturn("_id");
205+
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);
206+
when(result.getFailure()).thenReturn(new RuntimeException());
207+
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
208+
209+
listener.postIndex(shardId, operation, result);
210+
verify(triggerService).remove(eq("_id"));
211+
}
212+
213+
public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception {
214+
List<Engine.Result.Type> types = new ArrayList<>(List.of(Engine.Result.Type.values()));
215+
types.remove(Engine.Result.Type.FAILURE);
216+
217+
when(operation.id()).thenReturn("_id");
218+
when(result.getResultType()).thenReturn(randomFrom(types));
219+
when(result.getFailure()).thenReturn(new RuntimeException());
220+
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
221+
222+
listener.postIndex(shardId, operation, result);
223+
verifyZeroInteractions(triggerService);
224+
}
225+
226+
public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreNonWatcherDocument() throws Exception {
227+
when(operation.id()).thenReturn("_id");
228+
when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE);
229+
when(result.getFailure()).thenReturn(new RuntimeException());
230+
when(shardId.getIndexName()).thenReturn(randomAlphaOfLength(4));
231+
232+
listener.postIndex(shardId, operation, result);
233+
verifyZeroInteractions(triggerService);
234+
}
235+
236+
public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception {
204237
when(operation.id()).thenReturn("_id");
205238
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
206239

207240
listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
208241
verify(triggerService).remove(eq("_id"));
209242
}
210243

211-
public void testPostIndexDontInvokeForOtherDocuments() throws Exception {
244+
public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {
212245
when(operation.id()).thenReturn("_id");
213246
when(shardId.getIndexName()).thenReturn("anything");
214247
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ protected Settings restAdminSettings() {
110110
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
111111
}
112112

113-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32299")
114113
public void testMonitorClusterHealth() throws Exception {
115114
final String watchId = "cluster_health_watch";
116115

0 commit comments

Comments
 (0)