Skip to content

Commit 017779b

Browse files
spinscalehub-cap
authored andcommitted
Watcher: Prevent triggering watch when using activate API (#30613)
A wrong check in the activate watch API could lead to watches triggering on wrong nodes. The check was supposed to check if watch execution was distributed already in 6.x and only if not, then trigger locally. The if-condition however was broken and triggered the watch only when distributed watch execution was actually enabled.
1 parent 511856d commit 017779b

File tree

2 files changed

+192
-1
lines changed

2 files changed

+192
-1
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class TransportActivateWatchAction extends WatcherTransportAction<Activat
5454
private final WatchParser parser;
5555
private final Client client;
5656
private final TriggerService triggerService;
57+
private final ClusterService clusterService;
5758

5859
@Inject
5960
public TransportActivateWatchAction(Settings settings, TransportService transportService, ThreadPool threadPool,
@@ -64,6 +65,7 @@ public TransportActivateWatchAction(Settings settings, TransportService transpor
6465
licenseState, clusterService, ActivateWatchRequest::new, ActivateWatchResponse::new);
6566
this.clock = clock;
6667
this.parser = parser;
68+
this.clusterService = clusterService;
6769
this.client = client;
6870
this.triggerService = triggerService;
6971
}
@@ -95,7 +97,8 @@ protected void masterOperation(ActivateWatchRequest request, ClusterState state,
9597
XContentType.JSON);
9698
watch.version(getResponse.getVersion());
9799
watch.status().version(getResponse.getVersion());
98-
if (localExecute(request)) {
100+
// if we are not yet running in distributed mode, only call triggerservice, if we are on the master node
101+
if (localExecute(request) == false && this.clusterService.state().nodes().isLocalNodeElectedMaster()) {
99102
if (watch.status().state().isActive()) {
100103
triggerService.add(watch);
101104
} else {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.watcher.transport.actions.activate;
7+
8+
import org.elasticsearch.Version;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.DocWriteResponse;
11+
import org.elasticsearch.action.get.GetRequest;
12+
import org.elasticsearch.action.get.GetResponse;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.PlainActionFuture;
15+
import org.elasticsearch.action.update.UpdateRequest;
16+
import org.elasticsearch.action.update.UpdateResponse;
17+
import org.elasticsearch.client.Client;
18+
import org.elasticsearch.cluster.ClusterName;
19+
import org.elasticsearch.cluster.ClusterState;
20+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
21+
import org.elasticsearch.cluster.node.DiscoveryNode;
22+
import org.elasticsearch.cluster.node.DiscoveryNodes;
23+
import org.elasticsearch.cluster.service.ClusterService;
24+
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.util.concurrent.ThreadContext;
26+
import org.elasticsearch.index.Index;
27+
import org.elasticsearch.index.get.GetResult;
28+
import org.elasticsearch.index.shard.ShardId;
29+
import org.elasticsearch.license.XPackLicenseState;
30+
import org.elasticsearch.test.ESTestCase;
31+
import org.elasticsearch.threadpool.ThreadPool;
32+
import org.elasticsearch.transport.TransportService;
33+
import org.elasticsearch.xpack.core.watcher.transport.actions.activate.ActivateWatchRequest;
34+
import org.elasticsearch.xpack.core.watcher.transport.actions.activate.ActivateWatchResponse;
35+
import org.elasticsearch.xpack.core.watcher.watch.ClockMock;
36+
import org.elasticsearch.xpack.core.watcher.watch.Watch;
37+
import org.elasticsearch.xpack.core.watcher.watch.WatchStatus;
38+
import org.elasticsearch.xpack.watcher.test.WatchExecutionContextMockBuilder;
39+
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
40+
import org.elasticsearch.xpack.watcher.watch.WatchParser;
41+
import org.joda.time.DateTime;
42+
import org.joda.time.DateTimeZone;
43+
import org.junit.Before;
44+
45+
import java.util.Collections;
46+
import java.util.HashSet;
47+
48+
import static java.util.Arrays.asList;
49+
import static org.mockito.Matchers.any;
50+
import static org.mockito.Matchers.anyObject;
51+
import static org.mockito.Matchers.eq;
52+
import static org.mockito.Mockito.doAnswer;
53+
import static org.mockito.Mockito.mock;
54+
import static org.mockito.Mockito.verify;
55+
import static org.mockito.Mockito.verifyNoMoreInteractions;
56+
import static org.mockito.Mockito.when;
57+
58+
public class TransportActivateWatchActionTests extends ESTestCase {
59+
60+
private TransportActivateWatchAction action;
61+
private Watch watch = new WatchExecutionContextMockBuilder("watch_id").buildMock().watch();
62+
private ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
63+
private TriggerService triggerService = mock(TriggerService.class);
64+
private ClusterService clusterService = mock(ClusterService.class);
65+
66+
@Before
67+
public void setupAction() throws Exception {
68+
ThreadPool threadPool = mock(ThreadPool.class);
69+
when(threadPool.getThreadContext()).thenReturn(threadContext);
70+
71+
TransportService transportService = mock(TransportService.class);
72+
73+
WatchParser parser = mock(WatchParser.class);
74+
when(parser.parseWithSecrets(eq("watch_id"), eq(true), anyObject(), anyObject(), anyObject())).thenReturn(watch);
75+
76+
Client client = mock(Client.class);
77+
when(client.threadPool()).thenReturn(threadPool);
78+
79+
// mock an update response that calls the listener
80+
doAnswer(invocation -> {
81+
UpdateRequest request = (UpdateRequest) invocation.getArguments()[0];
82+
ActionListener<UpdateResponse> listener = (ActionListener) invocation.getArguments()[1];
83+
84+
ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), 0);
85+
listener.onResponse(new UpdateResponse(shardId, request.type(), request.id(), request.version(),
86+
DocWriteResponse.Result.UPDATED));
87+
88+
return null;
89+
}).when(client).update(any(), any());
90+
91+
// mock an get response that calls the listener
92+
doAnswer(invocation -> {
93+
GetRequest request = (GetRequest) invocation.getArguments()[0];
94+
ActionListener<GetResponse> listener = (ActionListener) invocation.getArguments()[1];
95+
96+
GetResult getResult = new GetResult(request.index(), request.type(), request.id(), request.version(), true, null,
97+
Collections.emptyMap());
98+
listener.onResponse(new GetResponse(getResult));
99+
100+
return null;
101+
}).when(client).get(any(), any());
102+
103+
action = new TransportActivateWatchAction(Settings.EMPTY, transportService, threadPool,
104+
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(Settings.EMPTY), new ClockMock(),
105+
new XPackLicenseState(Settings.EMPTY), parser, clusterService, client, triggerService);
106+
}
107+
108+
// when running in distributed mode, watches are only triggered by the indexing operation listener
109+
public void testWatchesAreNotTriggeredWhenDistributed() throws Exception {
110+
boolean watchActivated = randomBoolean();
111+
ActivateWatchRequest request = new ActivateWatchRequest("watch_id", watchActivated);
112+
ActionListener<ActivateWatchResponse> listener = PlainActionFuture.newFuture();
113+
114+
// add a few nodes, with current versions
115+
ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
116+
.nodes(DiscoveryNodes.builder()
117+
.masterNodeId("node_1")
118+
.localNodeId(randomFrom("node_1", "node_2"))
119+
.add(newNode("node_1", Version.CURRENT))
120+
.add(newNode("node_2", Version.CURRENT)))
121+
.build();
122+
when(clusterService.state()).thenReturn(clusterState);
123+
mockWatchStatus(watchActivated);
124+
125+
action.masterOperation(request, clusterState, listener);
126+
127+
verifyNoMoreInteractions(triggerService);
128+
}
129+
130+
public void testWatchesAreNotTriggeredOnNonMasterWhenNotDistributed() throws Exception {
131+
boolean watchActivated = randomBoolean();
132+
ActivateWatchRequest request = new ActivateWatchRequest("watch_id", watchActivated);
133+
ActionListener<ActivateWatchResponse> listener = PlainActionFuture.newFuture();
134+
135+
// add a few nodes, with current versions
136+
ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
137+
.nodes(DiscoveryNodes.builder()
138+
.masterNodeId("node_2")
139+
.localNodeId("node_1")
140+
.add(newNode("node_1", Version.CURRENT))
141+
.add(newNode("node_2", Version.V_5_6_10)))
142+
.build();
143+
when(clusterService.state()).thenReturn(clusterState);
144+
mockWatchStatus(watchActivated);
145+
146+
action.masterOperation(request, clusterState, listener);
147+
148+
verifyNoMoreInteractions(triggerService);
149+
}
150+
151+
// we trigger on the master node only, not on any other node
152+
public void testWatchesAreTriggeredOnMasterWhenNotDistributed() throws Exception {
153+
boolean watchActivated = randomBoolean();
154+
ActivateWatchRequest request = new ActivateWatchRequest("watch_id", watchActivated);
155+
ActionListener<ActivateWatchResponse> listener = PlainActionFuture.newFuture();
156+
157+
// add a few nodes, with current versions
158+
ClusterState clusterState = ClusterState.builder(new ClusterName("my_cluster"))
159+
.nodes(DiscoveryNodes.builder()
160+
.masterNodeId("node_1")
161+
.localNodeId("node_1")
162+
.add(newNode("node_1", Version.CURRENT))
163+
.add(newNode("node_2", Version.V_5_6_10)))
164+
.build();
165+
when(clusterService.state()).thenReturn(clusterState);
166+
mockWatchStatus(watchActivated);
167+
168+
action.masterOperation(request, clusterState, listener);
169+
170+
if (watchActivated) {
171+
verify(triggerService).add(eq(watch));
172+
} else {
173+
verify(triggerService).remove(eq("watch_id"));
174+
}
175+
}
176+
177+
private void mockWatchStatus(boolean active) {
178+
WatchStatus status = mock(WatchStatus.class);
179+
WatchStatus.State state = new WatchStatus.State(active, DateTime.now(DateTimeZone.UTC));
180+
when(status.state()).thenReturn(state);
181+
when(watch.status()).thenReturn(status);
182+
}
183+
184+
private static DiscoveryNode newNode(String nodeId, Version version) {
185+
return new DiscoveryNode(nodeId, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
186+
new HashSet<>(asList(DiscoveryNode.Role.values())), version);
187+
}
188+
}

0 commit comments

Comments
 (0)