Skip to content

Commit 13880bd

Browse files
authored
Watcher: Reload properly on remote shard change (#33167)
When a node dies that carries a watcher shard or a shard is relocated to another node, then watcher needs not only trigger a reload on the node where the shard relocation happened, but also on other nodes where copies of this shard, as different watches may need to be loaded. This commit takes the change of remote nodes into account by not only storing the local shard allocation ids in the WatcherLifeCycleService, but storing a list of ShardRoutings based on the local active shards. This also fixes some tests, which had a wrong assumption. Using `TestShardRouting.newShardRouting` in our tests for cluster state creation led to the issue of always creating new allocation ids which implicitely lead to a reload.
1 parent 0f22dbb commit 13880bd

File tree

2 files changed

+98
-21
lines changed

2 files changed

+98
-21
lines changed

x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1212
import org.elasticsearch.cluster.metadata.IndexMetaData;
1313
import org.elasticsearch.cluster.node.DiscoveryNode;
14-
import org.elasticsearch.cluster.routing.AllocationId;
1514
import org.elasticsearch.cluster.routing.RoutingNode;
1615
import org.elasticsearch.cluster.routing.ShardRouting;
1716
import org.elasticsearch.cluster.service.ClusterService;
@@ -22,13 +21,16 @@
2221
import org.elasticsearch.common.settings.Setting.Property;
2322
import org.elasticsearch.common.settings.Settings;
2423
import org.elasticsearch.gateway.GatewayService;
24+
import org.elasticsearch.index.shard.ShardId;
2525
import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
2626
import org.elasticsearch.xpack.core.watcher.WatcherState;
2727
import org.elasticsearch.xpack.core.watcher.watch.Watch;
2828
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
2929

3030
import java.util.Collections;
31+
import java.util.Comparator;
3132
import java.util.List;
33+
import java.util.Set;
3234
import java.util.concurrent.atomic.AtomicReference;
3335
import java.util.stream.Collectors;
3436

@@ -45,7 +47,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
4547
Setting.boolSetting("xpack.watcher.require_manual_start", false, Property.NodeScope);
4648

4749
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
48-
private final AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
50+
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
4951
private final boolean requireManualStart;
5052
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
5153
private volatile WatcherService watcherService;
@@ -144,15 +146,20 @@ public void clusterChanged(ClusterChangedEvent event) {
144146
return;
145147
}
146148

147-
List<String> currentAllocationIds = localShards.stream()
148-
.map(ShardRouting::allocationId)
149-
.map(AllocationId::getId)
150-
.sorted()
149+
// also check if non local shards have changed, as loosing a shard on a
150+
// remote node or adding a replica on a remote node needs to trigger a reload too
151+
Set<ShardId> localShardIds = localShards.stream().map(ShardRouting::shardId).collect(Collectors.toSet());
152+
List<ShardRouting> allShards = event.state().routingTable().index(watchIndex).shardsWithState(STARTED);
153+
allShards.addAll(event.state().routingTable().index(watchIndex).shardsWithState(RELOCATING));
154+
List<ShardRouting> localAffectedShardRoutings = allShards.stream()
155+
.filter(shardRouting -> localShardIds.contains(shardRouting.shardId()))
156+
// shardrouting is not comparable, so we need some order mechanism
157+
.sorted(Comparator.comparing(ShardRouting::hashCode))
151158
.collect(Collectors.toList());
152159

153-
if (previousAllocationIds.get().equals(currentAllocationIds) == false) {
160+
if (previousShardRoutings.get().equals(localAffectedShardRoutings) == false) {
154161
if (watcherService.validate(event.state())) {
155-
previousAllocationIds.set(Collections.unmodifiableList(currentAllocationIds));
162+
previousShardRoutings.set(localAffectedShardRoutings);
156163
if (state.get() == WatcherState.STARTED) {
157164
watcherService.reload(event.state(), "new local watcher shard allocation ids");
158165
} else if (state.get() == WatcherState.STOPPED) {
@@ -187,13 +194,13 @@ private boolean isWatcherStoppedManually(ClusterState state) {
187194
* @return true, if existing allocation ids were cleaned out, false otherwise
188195
*/
189196
private boolean clearAllocationIds() {
190-
List<String> previousIds = previousAllocationIds.getAndSet(Collections.emptyList());
191-
return previousIds.equals(Collections.emptyList()) == false;
197+
List<ShardRouting> previousIds = previousShardRoutings.getAndSet(Collections.emptyList());
198+
return previousIds.isEmpty() == false;
192199
}
193200

194201
// for testing purposes only
195-
List<String> allocationIds() {
196-
return previousAllocationIds.get();
202+
List<ShardRouting> shardRoutings() {
203+
return previousShardRoutings.get();
197204
}
198205

199206
public WatcherState getState() {

x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,12 @@ public void testReplicaWasAddedOrRemoved() {
254254
.add(newNode("node_2"))
255255
.build();
256256

257+
ShardRouting firstShardOnSecondNode = TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED);
258+
ShardRouting secondShardOnFirstNode = TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED);
259+
257260
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
258-
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED))
259-
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
261+
.addShard(secondShardOnFirstNode)
262+
.addShard(firstShardOnSecondNode)
260263
.build();
261264

262265
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
@@ -273,10 +276,19 @@ public void testReplicaWasAddedOrRemoved() {
273276
.metaData(MetaData.builder().put(indexMetaData, false))
274277
.build();
275278

279+
// add a replica in the local node
280+
boolean addShardOnLocalNode = randomBoolean();
281+
final ShardRouting addedShardRouting;
282+
if (addShardOnLocalNode) {
283+
addedShardRouting = TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED);
284+
} else {
285+
addedShardRouting = TestShardRouting.newShardRouting(secondShardId, "node_2", false, STARTED);
286+
}
287+
276288
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
277-
.addShard(TestShardRouting.newShardRouting(shardId, "node_1", false, STARTED))
278-
.addShard(TestShardRouting.newShardRouting(secondShardId, "node_1", true, STARTED))
279-
.addShard(TestShardRouting.newShardRouting(shardId, "node_2", true, STARTED))
289+
.addShard(secondShardOnFirstNode)
290+
.addShard(firstShardOnSecondNode)
291+
.addShard(addedShardRouting)
280292
.build();
281293

282294
ClusterState stateWithReplicaAdded = ClusterState.builder(new ClusterName("my-cluster"))
@@ -477,7 +489,67 @@ public void testDataNodeWithoutDataCanStart() {
477489
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
478490
}
479491

480-
private ClusterState startWatcher() {
492+
// this emulates a node outage somewhere in the cluster that carried a watcher shard
493+
// the number of shards remains the same, but we need to ensure that watcher properly reloads
494+
// previously we only checked the local shard allocations, but we also need to check if shards in the cluster have changed
495+
public void testWatcherReloadsOnNodeOutageWithWatcherShard() {
496+
Index watchIndex = new Index(Watch.INDEX, "foo");
497+
ShardId shardId = new ShardId(watchIndex, 0);
498+
String localNodeId = randomFrom("node_1", "node_2");
499+
String outageNodeId = localNodeId.equals("node_1") ? "node_2" : "node_1";
500+
DiscoveryNodes previousDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
501+
.add(newNode(localNodeId))
502+
.add(newNode(outageNodeId))
503+
.build();
504+
505+
ShardRouting replicaShardRouting = TestShardRouting.newShardRouting(shardId, localNodeId, false, STARTED);
506+
ShardRouting primartShardRouting = TestShardRouting.newShardRouting(shardId, outageNodeId, true, STARTED);
507+
IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
508+
.addShard(replicaShardRouting)
509+
.addShard(primartShardRouting)
510+
.build();
511+
512+
IndexMetaData indexMetaData = IndexMetaData.builder(Watch.INDEX)
513+
.settings(Settings.builder()
514+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
515+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
516+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
517+
.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 6)
518+
).build();
519+
520+
ClusterState previousState = ClusterState.builder(new ClusterName("my-cluster"))
521+
.nodes(previousDiscoveryNodes)
522+
.routingTable(RoutingTable.builder().add(previousWatchRoutingTable).build())
523+
.metaData(MetaData.builder().put(indexMetaData, false))
524+
.build();
525+
526+
ShardRouting nowPrimaryShardRouting = replicaShardRouting.moveActiveReplicaToPrimary();
527+
IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable.builder(watchIndex)
528+
.addShard(nowPrimaryShardRouting)
529+
.build();
530+
531+
DiscoveryNodes currentDiscoveryNodes = new DiscoveryNodes.Builder().masterNodeId(localNodeId).localNodeId(localNodeId)
532+
.add(newNode(localNodeId))
533+
.build();
534+
535+
ClusterState currentState = ClusterState.builder(new ClusterName("my-cluster"))
536+
.nodes(currentDiscoveryNodes)
537+
.routingTable(RoutingTable.builder().add(currentWatchRoutingTable).build())
538+
.metaData(MetaData.builder().put(indexMetaData, false))
539+
.build();
540+
541+
// initialize the previous state, so all the allocation ids are loaded
542+
when(watcherService.validate(anyObject())).thenReturn(true);
543+
lifeCycleService.clusterChanged(new ClusterChangedEvent("whatever", previousState, currentState));
544+
545+
reset(watcherService);
546+
when(watcherService.validate(anyObject())).thenReturn(true);
547+
ClusterChangedEvent event = new ClusterChangedEvent("whatever", currentState, previousState);
548+
lifeCycleService.clusterChanged(event);
549+
verify(watcherService).reload(eq(event.state()), anyString());
550+
}
551+
552+
private void startWatcher() {
481553
Index index = new Index(Watch.INDEX, "uuid");
482554
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
483555
indexRoutingTableBuilder.addShard(
@@ -506,12 +578,10 @@ private ClusterState startWatcher() {
506578
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", state, emptyState));
507579
assertThat(lifeCycleService.getState(), is(WatcherState.STARTED));
508580
verify(watcherService, times(1)).reload(eq(state), anyString());
509-
assertThat(lifeCycleService.allocationIds(), hasSize(1));
581+
assertThat(lifeCycleService.shardRoutings(), hasSize(1));
510582

511583
// reset the mock, the user has to mock everything themselves again
512584
reset(watcherService);
513-
514-
return state;
515585
}
516586

517587
private List<String> randomIndexPatterns() {

0 commit comments

Comments
 (0)