2828import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2929import org .elasticsearch .cluster .ClusterState ;
3030import org .elasticsearch .cluster .ClusterStateObserver ;
31+ import org .elasticsearch .cluster .ClusterStateUpdateTask ;
3132import org .elasticsearch .cluster .SnapshotDeletionsInProgress ;
3233import org .elasticsearch .cluster .SnapshotsInProgress ;
3334import org .elasticsearch .cluster .metadata .IndexMetadata ;
8283
8384import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
8485import static org .hamcrest .Matchers .empty ;
85- import static org .hamcrest .Matchers .equalTo ;
86+ import static org .hamcrest .Matchers .hasSize ;
8687import static org .hamcrest .Matchers .is ;
8788
8889public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
@@ -122,11 +123,11 @@ public void verifyNoLeakedListeners() throws Exception {
122123 @ After
123124 public void assertRepoConsistency () {
124125 if (skipRepoConsistencyCheckReason == null ) {
125- client (). admin (). cluster ().prepareGetRepositories ().get ().repositories ().forEach (repositoryMetadata -> {
126+ clusterAdmin ().prepareGetRepositories ().get ().repositories ().forEach (repositoryMetadata -> {
126127 final String name = repositoryMetadata .name ();
127128 if (repositoryMetadata .settings ().getAsBoolean ("readonly" , false ) == false ) {
128- client (). admin (). cluster ().prepareDeleteSnapshot (name , OLD_VERSION_SNAPSHOT_PREFIX + "*" ).get ();
129- client (). admin (). cluster ().prepareCleanupRepository (name ).get ();
129+ clusterAdmin ().prepareDeleteSnapshot (name , OLD_VERSION_SNAPSHOT_PREFIX + "*" ).get ();
130+ clusterAdmin ().prepareCleanupRepository (name ).get ();
130131 }
131132 BlobStoreTestUtil .assertRepoConsistency (internalCluster (), name );
132133 });
@@ -202,12 +203,10 @@ public static void waitForBlock(String node, String repository, TimeValue timeou
202203 public SnapshotInfo waitForCompletion (String repository , String snapshotName , TimeValue timeout ) throws InterruptedException {
203204 long start = System .currentTimeMillis ();
204205 while (System .currentTimeMillis () - start < timeout .millis ()) {
205- List <SnapshotInfo > snapshotInfos = client ().admin ().cluster ().prepareGetSnapshots (repository ).setSnapshots (snapshotName )
206- .get ().getSnapshots (repository );
207- assertThat (snapshotInfos .size (), equalTo (1 ));
208- if (snapshotInfos .get (0 ).state ().completed ()) {
206+ final SnapshotInfo snapshotInfo = getSnapshot (repository , snapshotName );
207+ if (snapshotInfo .state ().completed ()) {
209208 // Make sure that snapshot clean up operations are finished
210- ClusterStateResponse stateResponse = client (). admin (). cluster ().prepareState ().get ();
209+ ClusterStateResponse stateResponse = clusterAdmin ().prepareState ().get ();
211210 boolean found = false ;
212211 for (SnapshotsInProgress .Entry entry :
213212 stateResponse .getState ().custom (SnapshotsInProgress .TYPE , SnapshotsInProgress .EMPTY ).entries ()) {
@@ -218,7 +217,7 @@ public SnapshotInfo waitForCompletion(String repository, String snapshotName, Ti
218217 }
219218 }
220219 if (found == false ) {
221- return snapshotInfos . get ( 0 ) ;
220+ return snapshotInfo ;
222221 }
223222 }
224223 Thread .sleep (100 );
@@ -307,7 +306,7 @@ public void unblockNode(final String repository, final String node) {
307306
308307 protected void createRepository (String repoName , String type , Settings .Builder settings ) {
309308 logger .info ("--> creating repository [{}] [{}]" , repoName , type );
310- assertAcked (client (). admin (). cluster ().preparePutRepository (repoName )
309+ assertAcked (clusterAdmin ().preparePutRepository (repoName )
311310 .setType (type ).setSettings (settings ));
312311 }
313312
@@ -349,7 +348,7 @@ protected void maybeInitWithOldSnapshotVersion(String repoName, Path repoPath) t
349348 protected String initWithSnapshotVersion (String repoName , Path repoPath , Version version ) throws IOException {
350349 assertThat ("This hack only works on an empty repository" , getRepositoryData (repoName ).getSnapshotIds (), empty ());
351350 final String oldVersionSnapshot = OLD_VERSION_SNAPSHOT_PREFIX + version .id ;
352- final CreateSnapshotResponse createSnapshotResponse = client (). admin (). cluster ()
351+ final CreateSnapshotResponse createSnapshotResponse = clusterAdmin ()
353352 .prepareCreateSnapshot (repoName , oldVersionSnapshot ).setIndices ("does-not-exist-for-sure-*" )
354353 .setWaitForCompletion (true ).get ();
355354 assertThat (createSnapshotResponse .getSnapshotInfo ().totalShards (), is (0 ));
@@ -372,7 +371,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version
372371
373372 protected SnapshotInfo createFullSnapshot (String repoName , String snapshotName ) {
374373 logger .info ("--> creating full snapshot [{}] in [{}]" , snapshotName , repoName );
375- CreateSnapshotResponse createSnapshotResponse = client (). admin (). cluster ().prepareCreateSnapshot (repoName , snapshotName )
374+ CreateSnapshotResponse createSnapshotResponse = clusterAdmin ().prepareCreateSnapshot (repoName , snapshotName )
376375 .setIncludeGlobalState (true )
377376 .setWaitForCompletion (true )
378377 .get ();
@@ -416,7 +415,7 @@ protected void assertDocCount(String index, long count) {
416415 * @param metadata snapshot metadata to write (as returned by {@link SnapshotInfo#userMetadata()})
417416 */
418417 protected void addBwCFailedSnapshot (String repoName , String snapshotName , Map <String , Object > metadata ) throws Exception {
419- final ClusterState state = client (). admin (). cluster ().prepareState ().get ().getState ();
418+ final ClusterState state = clusterAdmin ().prepareState ().get ().getState ();
420419 final RepositoriesMetadata repositoriesMetadata = state .metadata ().custom (RepositoriesMetadata .TYPE );
421420 assertNotNull (repositoriesMetadata );
422421 final RepositoryMetadata initialRepoMetadata = repositoriesMetadata .repository (repoName );
@@ -486,7 +485,7 @@ protected ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName
486485
487486 protected ActionFuture <CreateSnapshotResponse > startFullSnapshot (String repoName , String snapshotName , boolean partial ) {
488487 logger .info ("--> creating full snapshot [{}] to repo [{}]" , snapshotName , repoName );
489- return client (). admin (). cluster ().prepareCreateSnapshot (repoName , snapshotName ).setWaitForCompletion (true )
488+ return clusterAdmin ().prepareCreateSnapshot (repoName , snapshotName ).setWaitForCompletion (true )
490489 .setPartial (partial ).execute ();
491490 }
492491
@@ -517,6 +516,35 @@ protected void createIndexWithContent(String indexName, Settings indexSettings)
517516
518517 protected ActionFuture <AcknowledgedResponse > startDeleteSnapshot (String repoName , String snapshotName ) {
519518 logger .info ("--> deleting snapshot [{}] from repo [{}]" , snapshotName , repoName );
520- return client ().admin ().cluster ().prepareDeleteSnapshot (repoName , snapshotName ).execute ();
519+ return clusterAdmin ().prepareDeleteSnapshot (repoName , snapshotName ).execute ();
520+ }
521+
522+ protected void updateClusterState (final Function <ClusterState , ClusterState > updater ) throws Exception {
523+ final PlainActionFuture <Void > future = PlainActionFuture .newFuture ();
524+ final ClusterService clusterService = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class );
525+ clusterService .submitStateUpdateTask ("test" , new ClusterStateUpdateTask () {
526+ @ Override
527+ public ClusterState execute (ClusterState currentState ) {
528+ return updater .apply (currentState );
529+ }
530+
531+ @ Override
532+ public void onFailure (String source , Exception e ) {
533+ future .onFailure (e );
534+ }
535+
536+ @ Override
537+ public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
538+ future .onResponse (null );
539+ }
540+ });
541+ future .get ();
542+ }
543+
544+ protected SnapshotInfo getSnapshot (String repository , String snapshot ) {
545+ final List <SnapshotInfo > snapshotInfos = clusterAdmin ().prepareGetSnapshots (repository ).setSnapshots (snapshot )
546+ .get ().getSnapshots (repository );
547+ assertThat (snapshotInfos , hasSize (1 ));
548+ return snapshotInfos .get (0 );
521549 }
522550}
0 commit comments