1818 */
1919package org .elasticsearch .indices .state ;
2020
21- import org .elasticsearch .action .index . IndexResponse ;
21+ import org .elasticsearch .action .admin . cluster . reroute . ClusterRerouteRequest ;
2222import org .elasticsearch .action .support .master .AcknowledgedResponse ;
23- import org .elasticsearch .cluster .ClusterState ;
23+ import org .elasticsearch .cluster .node . DiscoveryNode ;
2424import org .elasticsearch .cluster .routing .IndexRoutingTable ;
2525import org .elasticsearch .cluster .routing .ShardRouting ;
26+ import org .elasticsearch .cluster .routing .allocation .command .AllocationCommands ;
2627import org .elasticsearch .cluster .routing .allocation .command .MoveAllocationCommand ;
28+ import org .elasticsearch .cluster .routing .allocation .decider .ConcurrentRebalanceAllocationDecider ;
2729import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider ;
30+ import org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider .Rebalance ;
31+ import org .elasticsearch .cluster .routing .allocation .decider .ThrottlingAllocationDecider ;
32+ import org .elasticsearch .cluster .service .ClusterService ;
2833import org .elasticsearch .common .settings .Settings ;
2934import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
30- import org .elasticsearch .rest .RestStatus ;
35+ import org .elasticsearch .indices .recovery .PeerRecoverySourceService ;
36+ import org .elasticsearch .indices .recovery .StartRecoveryRequest ;
37+ import org .elasticsearch .plugins .Plugin ;
3138import org .elasticsearch .test .BackgroundIndexer ;
3239import org .elasticsearch .test .ESIntegTestCase ;
40+ import org .elasticsearch .test .junit .annotations .TestLogging ;
41+ import org .elasticsearch .test .transport .MockTransportService ;
42+ import org .elasticsearch .transport .TransportService ;
3343
3444import java .util .ArrayList ;
45+ import java .util .Collection ;
3546import java .util .HashMap ;
3647import java .util .List ;
3748import java .util .Map ;
3849import java .util .Set ;
3950import java .util .concurrent .CountDownLatch ;
51+ import java .util .stream .Collectors ;
52+ import java .util .stream .IntStream ;
4053
41- import static org .elasticsearch .cluster .routing .allocation .decider .ConcurrentRebalanceAllocationDecider .CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING ;
42- import static org .elasticsearch .cluster .routing .allocation .decider .EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING ;
43- import static org .elasticsearch .cluster .routing .allocation .decider .ThrottlingAllocationDecider .CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING ;
54+ import static java .util .Collections .singletonList ;
4455import static org .elasticsearch .indices .state .CloseIndexIT .assertException ;
4556import static org .elasticsearch .indices .state .CloseIndexIT .assertIndexIsClosed ;
4657import static org .elasticsearch .indices .state .CloseIndexIT .assertIndexIsOpened ;
5061@ ESIntegTestCase .ClusterScope (minNumDataNodes = 2 )
5162public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
5263
64+ @ Override
65+ protected Collection <Class <? extends Plugin >> nodePlugins () {
66+ return singletonList (MockTransportService .TestPlugin .class );
67+ }
68+
5369 @ Override
5470 protected Settings nodeSettings (int nodeOrdinal ) {
5571 return Settings .builder ()
5672 .put (super .nodeSettings (nodeOrdinal ))
57- .put (CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey (), 10 )
58- .put (CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING .getKey (), -1 )
73+ .put (ThrottlingAllocationDecider . CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING .getKey (), Integer . MAX_VALUE )
74+ .put (ConcurrentRebalanceAllocationDecider . CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING .getKey (), -1 )
5975 .build ();
6076 }
6177
6278 @ Override
63- protected int numberOfReplicas () {
64- return 1 ;
79+ protected int maximumNumberOfShards () {
80+ return 3 ;
6581 }
6682
67- @ AwaitsFix ( bugUrl = "https://github.com/elastic/ elasticsearch/issues/37274 " )
83+ @ TestLogging ( "org.elasticsearch.cluster.metadata.MetaDataIndexStateService:DEBUG,org. elasticsearch.action.admin.indices.close:DEBUG " )
6884 public void testCloseWhileRelocatingShards () throws Exception {
69- final String [] indices = new String [randomIntBetween (1 , 3 )];
85+ final String [] indices = new String [randomIntBetween (3 , 5 )];
7086 final Map <String , Long > docsPerIndex = new HashMap <>();
87+ final Map <String , BackgroundIndexer > indexers = new HashMap <>();
7188
7289 for (int i = 0 ; i < indices .length ; i ++) {
73- final String indexName = "index-" + i ;
74- createIndex (indexName );
75-
90+ final String indexName = "index-" + i ;
7691 int nbDocs = 0 ;
77- if (randomBoolean ()) {
78- nbDocs = randomIntBetween (1 , 20 );
79- for (int j = 0 ; j < nbDocs ; j ++) {
80- IndexResponse indexResponse = client ().prepareIndex (indexName , "_doc" ).setSource ("num" , j ).get ();
81- assertEquals (RestStatus .CREATED , indexResponse .status ());
82- }
92+ switch (i ) {
93+ case 0 :
94+ logger .debug ("creating empty index {}" , indexName );
95+ createIndex (indexName );
96+ break ;
97+ case 1 :
98+ nbDocs = scaledRandomIntBetween (1 , 100 );
99+ logger .debug ("creating index {} with {} documents" , indexName , nbDocs );
100+ createIndex (indexName );
101+ indexRandom (randomBoolean (), IntStream .range (0 , nbDocs )
102+ .mapToObj (n -> client ().prepareIndex (indexName , "_doc" ).setSource ("num" , n ))
103+ .collect (Collectors .toList ()));
104+ break ;
105+ default :
106+ logger .debug ("creating index {} with background indexing" , indexName );
107+ final BackgroundIndexer indexer = new BackgroundIndexer (indexName , "_doc" , client (), -1 , 1 );
108+ indexers .put (indexName , indexer );
109+ waitForDocs (1 , indexer );
83110 }
84111 docsPerIndex .put (indexName , (long ) nbDocs );
85112 indices [i ] = indexName ;
@@ -88,60 +115,72 @@ public void testCloseWhileRelocatingShards() throws Exception {
88115 ensureGreen (indices );
89116 assertAcked (client ().admin ().cluster ().prepareUpdateSettings ()
90117 .setTransientSettings (Settings .builder ()
91- .put (CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), EnableAllocationDecider . Rebalance .NONE .toString ())));
118+ .put (EnableAllocationDecider . CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey (), Rebalance .NONE .toString ())));
92119
93- // start some concurrent indexing threads
94- final Map <String , BackgroundIndexer > indexers = new HashMap <>();
95- for (final String index : indices ) {
96- if (randomBoolean ()) {
97- final BackgroundIndexer indexer = new BackgroundIndexer (index , "_doc" , client (), -1 , scaledRandomIntBetween (1 , 3 ));
98- waitForDocs (1 , indexer );
99- indexers .put (index , indexer );
100- }
101- }
120+ final String targetNode = internalCluster ().startDataOnlyNode ();
121+ ensureClusterSizeConsistency (); // wait for the master to finish processing join.
102122
103123 final Set <String > acknowledgedCloses = ConcurrentCollections .newConcurrentSet ();
104- final String newNode = internalCluster ().startDataOnlyNode ();
105124 try {
106- final CountDownLatch latch = new CountDownLatch (1 );
107- final List <Thread > threads = new ArrayList <>();
108-
109- // start shards relocating threads
110- final ClusterState clusterState = client ().admin ().cluster ().prepareState ().get ().getState ();
111- for (final String indexToRelocate : indices ) {
112- final IndexRoutingTable indexRoutingTable = clusterState .routingTable ().index (indexToRelocate );
113- for (int i = 0 ; i < getNumShards (indexToRelocate ).numPrimaries ; i ++) {
114- final int shardId = i ;
115- ShardRouting primary = indexRoutingTable .shard (shardId ).primaryShard ();
116- assertTrue (primary .started ());
117- ShardRouting replica = indexRoutingTable .shard (shardId ).replicaShards ().iterator ().next ();
125+ final ClusterService clusterService = internalCluster ().getInstance (ClusterService .class , internalCluster ().getMasterName ());
126+ final CountDownLatch latch = new CountDownLatch (indices .length );
127+ final CountDownLatch release = new CountDownLatch (1 );
128+
129+ // relocate one shard for every index to be closed
130+ final AllocationCommands commands = new AllocationCommands ();
131+ for (final String index : indices ) {
132+ final NumShards numShards = getNumShards (index );
133+ final int shardId = numShards .numPrimaries == 1 ? 0 : randomIntBetween (0 , numShards .numPrimaries - 1 );
134+ final IndexRoutingTable indexRoutingTable = clusterService .state ().routingTable ().index (index );
135+
136+ final ShardRouting primary = indexRoutingTable .shard (shardId ).primaryShard ();
137+ assertTrue (primary .started ());
138+
139+ String currentNodeId = primary .currentNodeId ();
140+ if (numShards .numReplicas > 0 ) {
141+ final ShardRouting replica = indexRoutingTable .shard (shardId ).replicaShards ().iterator ().next ();
118142 assertTrue (replica .started ());
143+ if (randomBoolean ()) {
144+ currentNodeId = replica .currentNodeId ();
145+ }
146+ }
119147
120- final String currentNodeId = randomBoolean () ? primary .currentNodeId () : replica .currentNodeId ();
121- assertNotNull (currentNodeId );
122-
123- final Thread thread = new Thread (() -> {
124- try {
125- latch .await ();
126- } catch (InterruptedException e ) {
127- throw new AssertionError (e );
148+ final DiscoveryNode sourceNode = clusterService .state ().nodes ().resolveNode (primary .currentNodeId ());
149+ ((MockTransportService ) internalCluster ().getInstance (TransportService .class , targetNode ))
150+ .addSendBehavior (internalCluster ().getInstance (TransportService .class , sourceNode .getName ()),
151+ (connection , requestId , action , request , options ) -> {
152+ if (PeerRecoverySourceService .Actions .START_RECOVERY .equals (action )) {
153+ logger .debug ("blocking recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
154+ latch .countDown ();
155+ try {
156+ release .await ();
157+ logger .debug ("releasing recovery of shard {}" , ((StartRecoveryRequest ) request ).shardId ());
158+ } catch (InterruptedException e ) {
159+ throw new AssertionError (e );
160+ }
161+ }
162+ connection .sendRequest (requestId , action , request , options );
128163 }
129- assertAcked (client ().admin ().cluster ().prepareReroute ()
130- .add (new MoveAllocationCommand (indexToRelocate , shardId , currentNodeId , newNode )));
131- });
132- threads .add (thread );
133- thread .start ();
134- }
164+ );
165+ commands .add (new MoveAllocationCommand (index , shardId , currentNodeId , targetNode ));
135166 }
136167
168+ assertAcked (client ().admin ().cluster ().reroute (new ClusterRerouteRequest ().commands (commands )).get ());
169+
137170 // start index closing threads
171+ final List <Thread > threads = new ArrayList <>();
138172 for (final String indexToClose : indices ) {
139173 final Thread thread = new Thread (() -> {
140174 try {
141175 latch .await ();
142176 } catch (InterruptedException e ) {
143177 throw new AssertionError (e );
178+ } finally {
179+ release .countDown ();
144180 }
181+ // Closing is not always acknowledged when shards are relocating: this is the case when the target shard is initializing
182+ // or is catching up operations. In these cases the TransportVerifyShardBeforeCloseAction will detect that the global
183+ // and max sequence number don't match and will not ack the close.
145184 AcknowledgedResponse closeResponse = client ().admin ().indices ().prepareClose (indexToClose ).get ();
146185 if (closeResponse .isAcknowledged ()) {
147186 assertTrue ("Index closing should not be acknowledged twice" , acknowledgedCloses .add (indexToClose ));
@@ -155,6 +194,7 @@ public void testCloseWhileRelocatingShards() throws Exception {
155194 for (Thread thread : threads ) {
156195 thread .join ();
157196 }
197+
158198 for (Map .Entry <String , BackgroundIndexer > entry : indexers .entrySet ()) {
159199 final BackgroundIndexer indexer = entry .getValue ();
160200 indexer .setAssertNoFailuresOnStop (false );
@@ -172,7 +212,8 @@ public void testCloseWhileRelocatingShards() throws Exception {
172212 }
173213 } finally {
174214 assertAcked (client ().admin ().cluster ().prepareUpdateSettings ()
175- .setTransientSettings (Settings .builder ().putNull (CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey ())));
215+ .setTransientSettings (Settings .builder ()
216+ .putNull (EnableAllocationDecider .CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING .getKey ())));
176217 }
177218
178219 for (String index : indices ) {
0 commit comments