@@ -255,9 +255,12 @@ public void testReplicaWasAddedOrRemoved() {
255255 .add (newNode ("node_2" ))
256256 .build ();
257257
258+ ShardRouting firstShardOnSecondNode = TestShardRouting .newShardRouting (shardId , "node_2" , true , STARTED );
259+ ShardRouting secondShardOnFirstNode = TestShardRouting .newShardRouting (secondShardId , "node_1" , true , STARTED );
260+
258261 IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable .builder (watchIndex )
259- .addShard (TestShardRouting . newShardRouting ( secondShardId , "node_1" , true , STARTED ) )
260- .addShard (TestShardRouting . newShardRouting ( shardId , "node_2" , true , STARTED ) )
262+ .addShard (secondShardOnFirstNode )
263+ .addShard (firstShardOnSecondNode )
261264 .build ();
262265
263266 IndexMetaData indexMetaData = IndexMetaData .builder (Watch .INDEX )
@@ -274,10 +277,19 @@ public void testReplicaWasAddedOrRemoved() {
274277 .metaData (MetaData .builder ().put (indexMetaData , false ))
275278 .build ();
276279
280+ // add a replica in the local node
281+ boolean addShardOnLocalNode = randomBoolean ();
282+ final ShardRouting addedShardRouting ;
283+ if (addShardOnLocalNode ) {
284+ addedShardRouting = TestShardRouting .newShardRouting (shardId , "node_1" , false , STARTED );
285+ } else {
286+ addedShardRouting = TestShardRouting .newShardRouting (secondShardId , "node_2" , false , STARTED );
287+ }
288+
277289 IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable .builder (watchIndex )
278- .addShard (TestShardRouting . newShardRouting ( shardId , "node_1" , false , STARTED ) )
279- .addShard (TestShardRouting . newShardRouting ( secondShardId , "node_1" , true , STARTED ) )
280- .addShard (TestShardRouting . newShardRouting ( shardId , "node_2" , true , STARTED ) )
290+ .addShard (secondShardOnFirstNode )
291+ .addShard (firstShardOnSecondNode )
292+ .addShard (addedShardRouting )
281293 .build ();
282294
283295 ClusterState stateWithReplicaAdded = ClusterState .builder (new ClusterName ("my-cluster" ))
@@ -563,7 +575,67 @@ public void testDataNodeWithoutDataCanStart() {
563575 assertThat (lifeCycleService .getState (), is (WatcherState .STARTED ));
564576 }
565577
566- private ClusterState startWatcher () {
578+ // this emulates a node outage somewhere in the cluster that carried a watcher shard
579+ // the number of shards remains the same, but we need to ensure that watcher properly reloads
580+ // previously we only checked the local shard allocations, but we also need to check if shards in the cluster have changed
581+ public void testWatcherReloadsOnNodeOutageWithWatcherShard () {
582+ Index watchIndex = new Index (Watch .INDEX , "foo" );
583+ ShardId shardId = new ShardId (watchIndex , 0 );
584+ String localNodeId = randomFrom ("node_1" , "node_2" );
585+ String outageNodeId = localNodeId .equals ("node_1" ) ? "node_2" : "node_1" ;
586+ DiscoveryNodes previousDiscoveryNodes = new DiscoveryNodes .Builder ().masterNodeId (localNodeId ).localNodeId (localNodeId )
587+ .add (newNode (localNodeId ))
588+ .add (newNode (outageNodeId ))
589+ .build ();
590+
591+ ShardRouting replicaShardRouting = TestShardRouting .newShardRouting (shardId , localNodeId , false , STARTED );
592+ ShardRouting primartShardRouting = TestShardRouting .newShardRouting (shardId , outageNodeId , true , STARTED );
593+ IndexRoutingTable previousWatchRoutingTable = IndexRoutingTable .builder (watchIndex )
594+ .addShard (replicaShardRouting )
595+ .addShard (primartShardRouting )
596+ .build ();
597+
598+ IndexMetaData indexMetaData = IndexMetaData .builder (Watch .INDEX )
599+ .settings (Settings .builder ()
600+ .put (IndexMetaData .SETTING_NUMBER_OF_SHARDS , 1 )
601+ .put (IndexMetaData .SETTING_NUMBER_OF_REPLICAS , 0 )
602+ .put (IndexMetaData .SETTING_VERSION_CREATED , Version .CURRENT )
603+ .put (IndexMetaData .INDEX_FORMAT_SETTING .getKey (), 6 )
604+ ).build ();
605+
606+ ClusterState previousState = ClusterState .builder (new ClusterName ("my-cluster" ))
607+ .nodes (previousDiscoveryNodes )
608+ .routingTable (RoutingTable .builder ().add (previousWatchRoutingTable ).build ())
609+ .metaData (MetaData .builder ().put (indexMetaData , false ))
610+ .build ();
611+
612+ ShardRouting nowPrimaryShardRouting = replicaShardRouting .moveActiveReplicaToPrimary ();
613+ IndexRoutingTable currentWatchRoutingTable = IndexRoutingTable .builder (watchIndex )
614+ .addShard (nowPrimaryShardRouting )
615+ .build ();
616+
617+ DiscoveryNodes currentDiscoveryNodes = new DiscoveryNodes .Builder ().masterNodeId (localNodeId ).localNodeId (localNodeId )
618+ .add (newNode (localNodeId ))
619+ .build ();
620+
621+ ClusterState currentState = ClusterState .builder (new ClusterName ("my-cluster" ))
622+ .nodes (currentDiscoveryNodes )
623+ .routingTable (RoutingTable .builder ().add (currentWatchRoutingTable ).build ())
624+ .metaData (MetaData .builder ().put (indexMetaData , false ))
625+ .build ();
626+
627+ // initialize the previous state, so all the allocation ids are loaded
628+ when (watcherService .validate (anyObject ())).thenReturn (true );
629+ lifeCycleService .clusterChanged (new ClusterChangedEvent ("whatever" , previousState , currentState ));
630+
631+ reset (watcherService );
632+ when (watcherService .validate (anyObject ())).thenReturn (true );
633+ ClusterChangedEvent event = new ClusterChangedEvent ("whatever" , currentState , previousState );
634+ lifeCycleService .clusterChanged (event );
635+ verify (watcherService ).reload (eq (event .state ()), anyString ());
636+ }
637+
638+ private void startWatcher () {
567639 Index index = new Index (Watch .INDEX , "uuid" );
568640 IndexRoutingTable .Builder indexRoutingTableBuilder = IndexRoutingTable .builder (index );
569641 indexRoutingTableBuilder .addShard (
@@ -593,12 +665,10 @@ private ClusterState startWatcher() {
593665 lifeCycleService .clusterChanged (new ClusterChangedEvent ("foo" , state , emptyState ));
594666 assertThat (lifeCycleService .getState (), is (WatcherState .STARTED ));
595667 verify (watcherService , times (1 )).reload (eq (state ), anyString ());
596- assertThat (lifeCycleService .allocationIds (), hasSize (1 ));
668+ assertThat (lifeCycleService .shardRoutings (), hasSize (1 ));
597669
598670 // reset the mock, the user has to mock everything themselves again
599671 reset (watcherService );
600-
601- return state ;
602672 }
603673
604674 private List <String > randomIndexPatterns () {
0 commit comments