Skip to content

Commit ab1f6c9

Browse files
authored
Resolve concurrency with watcher trigger service (#39167)
The watcher trigger service could attempt to modify the perWatchStats map simultaneously from multiple threads. This would cause the internal state to become inconsistent, in particular the count() method may return an incorrect value for the number of watches. This changes replaces the implementation of the map with a ConcurrentHashMap so that its internal state remains consistent even when accessed from mutiple threads. Backport of: #39092
1 parent 586db5e commit ab1f6c9

File tree

3 files changed

+4
-2
lines changed

3 files changed

+4
-2
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/trigger/TriggerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.List;
2020
import java.util.Map;
2121
import java.util.Set;
22+
import java.util.concurrent.ConcurrentHashMap;
2223
import java.util.concurrent.CopyOnWriteArrayList;
2324
import java.util.function.Consumer;
2425

@@ -29,7 +30,7 @@ public class TriggerService {
2930

3031
private final GroupedConsumer consumer = new GroupedConsumer();
3132
private final Map<String, TriggerEngine> engines;
32-
private final Map<String, TriggerWatchStats> perWatchStats = new HashMap<>();
33+
private final Map<String, TriggerWatchStats> perWatchStats = new ConcurrentHashMap<>();
3334

3435
public TriggerService(Set<TriggerEngine> engines) {
3536
Map<String, TriggerEngine> builder = new HashMap<>();

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,10 @@ public void testPausingWatcherServiceAlsoPausesTriggerService() {
231231
Trigger trigger = mock(Trigger.class);
232232
when(trigger.type()).thenReturn(engineType);
233233

234+
final String id = randomAlphaOfLengthBetween(3, 12);
234235
Watch watch = mock(Watch.class);
235236
when(watch.trigger()).thenReturn(trigger);
237+
when(watch.id()).thenReturn(id);
236238
when(watch.condition()).thenReturn(InternalAlwaysCondition.INSTANCE);
237239
ExecutableNoneInput noneInput = new ExecutableNoneInput();
238240
when(watch.input()).thenReturn(noneInput);

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ public void testLoadMalformedWatchRecord() throws Exception {
141141
});
142142
}
143143

144-
@AwaitsFix(bugUrl = "Supposedly fixed; https://github.com/elastic/x-pack-elasticsearch/issues/1915")
145144
public void testLoadExistingWatchesUponStartup() throws Exception {
146145
stopWatcher();
147146

0 commit comments

Comments
 (0)