diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index f798d1ec4222f..df85f0483d67a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -902,7 +902,7 @@ private String createExpiredData(String jobId) throws Exception { // b) is slightly more efficient since we may not need to wait an entire second for the timestamp to increment assertBusy(() -> { long timeNow = System.currentTimeMillis() / 1000; - assertFalse(prevJobTimeStamp >= timeNow); + assertThat(prevJobTimeStamp, lessThan(timeNow)); }); // Update snapshot timestamp to force it out of snapshot retention window @@ -920,7 +920,8 @@ private String createExpiredData(String jobId) throws Exception { waitForForecastToComplete(jobId, forecastJobResponse.getForecastId()); // Wait for the forecast to expire - awaitBusy(() -> false, 1, TimeUnit.SECONDS); + // FIXME: We should wait for something specific to change, rather than waiting for time to pass. + waitUntil(() -> false, 1, TimeUnit.SECONDS); // Run up to now startDatafeed(datafeedId, String.valueOf(0), String.valueOf(nowMillis)); @@ -964,7 +965,9 @@ public void testDeleteExpiredData() throws Exception { assertTrue(response.getDeleted()); - awaitBusy(() -> false, 1, TimeUnit.SECONDS); + // Wait for the forecast to expire + // FIXME: We should wait for something specific to change, rather than waiting for time to pass. + waitUntil(() -> false, 1, TimeUnit.SECONDS); GetModelSnapshotsRequest getModelSnapshotsRequest1 = new GetModelSnapshotsRequest(jobId); GetModelSnapshotsResponse getModelSnapshotsResponse1 = execute(getModelSnapshotsRequest1, machineLearningClient::getModelSnapshots, @@ -2079,8 +2082,6 @@ private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT); } - - private String createAndPutDatafeed(String jobId, String indexName) throws IOException { String datafeedId = jobId + "-feed"; DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId) diff --git a/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java b/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java index ccd319fdc7719..20b25c805aca1 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java +++ b/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java @@ -200,7 +200,7 @@ public void testUpdateByQuery() { } } - public void testTasks() throws InterruptedException { + public void testTasks() throws Exception { final Client client = client(); final ReindexRequestBuilder builder = reindexAndPartiallyBlock(); @@ -284,7 +284,7 @@ public void onFailure(Exception e) { * Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state * @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher) */ - private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedException { + private ReindexRequestBuilder reindexAndPartiallyBlock() throws Exception { final Client client = client(); final int numDocs = randomIntBetween(10, 100); ALLOWED_OPERATIONS.release(numDocs); @@ -310,9 +310,12 @@ private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedExcep builder.execute(); // 10 seconds is usually fine but on heavily loaded machines this can take a while - assertTrue("updates blocked", awaitBusy( - () -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0, - 1, TimeUnit.MINUTES)); + assertBusy( + () -> { + assertTrue("Expected some queued threads", ALLOWED_OPERATIONS.hasQueuedThreads()); + assertEquals("Expected that no permits are available", 0, ALLOWED_OPERATIONS.availablePermits()); + }, + 1, TimeUnit.MINUTES); return builder; } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java index aa3c4470672b3..1096bb08fc349 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -118,10 +118,9 @@ private void testCancel(String action, AbstractBulkByScrollRequestBuilder * exhausted their slice while others might have quite a bit left * to work on. We can't control that. */ logger.debug("waiting for updates to be blocked"); - boolean blocked = awaitBusy( - () -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0, + assertBusy( + () -> assertTrue("updates blocked", ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0), 1, TimeUnit.MINUTES); // 10 seconds is usually fine but on heavily loaded machines this can take a while - assertTrue("updates blocked", blocked); // Status should show the task running TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices()); diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/SearchRestCancellationIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/SearchRestCancellationIT.java index 698f42c43ca03..475176eb31eae 100644 --- a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/SearchRestCancellationIT.java +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/SearchRestCancellationIT.java @@ -240,7 +240,7 @@ public Map, Object>> pluginScripts() { LogManager.getLogger(SearchRestCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id")); hits.incrementAndGet(); try { - awaitBusy(() -> shouldBlock.get() == false); + waitUntil(() -> shouldBlock.get() == false); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index ab1cbfce4103e..03fa65fde4f6a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -167,7 +167,7 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task) // Simulate a job that takes forever to finish // Using periodic checks method to identify that the task was cancelled try { - awaitBusy(() -> { + waitUntil(() -> { if (((CancellableTask) task).isCancelled()) { throw new TaskCancelledException("Cancelled"); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index ba93c41806e51..f280a3dc36ad5 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -75,7 +75,7 @@ import java.util.Objects; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; -import static org.elasticsearch.test.ESTestCase.awaitBusy; +import static org.elasticsearch.test.ESTestCase.waitUntil; /** * A plugin that adds a cancellable blocking test task of integration testing of the task manager. @@ -305,7 +305,7 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) { logger.info("Test task started on the node {}", clusterService.localNode()); if (request.shouldBlock) { try { - awaitBusy(() -> { + waitUntil(() -> { if (((CancellableTask) task).isCancelled()) { throw new RuntimeException("Cancelled!"); } diff --git a/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index a78bd4a67ece4..44179569b3142 100644 --- a/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -128,10 +128,11 @@ public void testTwoNodesNoMasterBlock() throws Exception { Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNode)); - awaitBusy(() -> { + assertBusy(() -> { ClusterState clusterState = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - return clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID); + assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID)); }); + state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true)); // verify that both nodes are still in the cluster state but there is no master diff --git a/server/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java b/server/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java index 1555e9fd4cfd2..ea06ec040c083 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/NoMasterNodeIT.java @@ -219,11 +219,10 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception { final Client clientToMasterlessNode = client(); - assertTrue(awaitBusy(() -> { - ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState(); - return state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID); - } - )); + assertBusy(() -> { + ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState(); + assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID)); + }); GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "type1", "1").get(); assertExists(getResponse); diff --git a/server/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java index edcf4446dc2bf..a528670ef000e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/allocation/AwarenessAllocationIT.java @@ -38,9 +38,11 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @ClusterScope(scope= ESIntegTestCase.Scope.TEST, numDataNodes =0, minNumDataNodes = 2) @@ -78,40 +80,43 @@ public void testSimpleAwareness() throws Exception { final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_2").build()); // On slow machines the initial relocation might be delayed - assertThat(awaitBusy( - () -> { - logger.info("--> waiting for no relocation"); - ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() - .setIndices("test1", "test2") - .setWaitForEvents(Priority.LANGUID) - .setWaitForGreenStatus() - .setWaitForNodes("3") - .setWaitForNoRelocatingShards(true) - .get(); - if (clusterHealth.isTimedOut()) { - return false; - } - - logger.info("--> checking current state"); - ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); - // check that closed indices are effectively closed - if (indicesToClose.stream().anyMatch(index -> clusterState.metaData().index(index).getState() != State.CLOSE)) { - return false; - } - // verify that we have all the primaries on node3 - ObjectIntHashMap counts = new ObjectIntHashMap<>(); - for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - for (ShardRouting shardRouting : indexShardRoutingTable) { - counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1); - } + assertBusy( + () -> { + logger.info("--> waiting for no relocation"); + ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth() + .setIndices("test1", "test2") + .setWaitForEvents(Priority.LANGUID) + .setWaitForGreenStatus() + .setWaitForNodes("3") + .setWaitForNoRelocatingShards(true) + .get(); + + assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false)); + + logger.info("--> checking current state"); + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + + // check that closed indices are effectively closed + final List notClosedIndices = + indicesToClose.stream() + .filter(index -> clusterState.metaData().index(index).getState() != State.CLOSE) + .collect(Collectors.toList()); + assertThat("Some indices not closed", notClosedIndices, empty()); + + // verify that we have all the primaries on node3 + ObjectIntHashMap counts = new ObjectIntHashMap<>(); + for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1); } } - return counts.get(node3) == totalPrimaries; - }, - 10, - TimeUnit.SECONDS - ), equalTo(true)); + } + assertThat(counts.get(node3), equalTo(totalPrimaries)); + }, + 10, + TimeUnit.SECONDS + ); } public void testAwarenessZones() { diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 0e732d5fb0502..0a5f71427dac1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -400,8 +400,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS block1.countDown(); invoked2.await(); - // whenever we test for no tasks, we need to awaitBusy since this is a live node - assertTrue(awaitBusy(() -> clusterService.getMasterService().pendingTasks().isEmpty())); + // whenever we test for no tasks, we need to wait since this is a live node + assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty())); waitNoPendingTasksOnAll(); final CountDownLatch block2 = new CountDownLatch(1); diff --git a/server/src/test/java/org/elasticsearch/discovery/MasterDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/MasterDisruptionIT.java index 8f422537b384f..eac7ae3737fb9 100644 --- a/server/src/test/java/org/elasticsearch/discovery/MasterDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/MasterDisruptionIT.java @@ -282,21 +282,17 @@ public void testMappingTimeout() throws Exception { } - private void assertDiscoveryCompleted(List nodes) throws InterruptedException { + private void assertDiscoveryCompleted(List nodes) throws Exception { for (final String node : nodes) { - assertTrue( - "node [" + node + "] is still joining master", - awaitBusy( - () -> { - final Discovery discovery = internalCluster().getInstance(Discovery.class, node); - if (discovery instanceof ZenDiscovery) { - return !((ZenDiscovery) discovery).joiningCluster(); - } - return true; - }, - 30, - TimeUnit.SECONDS - ) + assertBusy( + () -> { + final Discovery discovery = internalCluster().getInstance(Discovery.class, node); + if (discovery instanceof ZenDiscovery) { + assertFalse("node [" + node + "] is still joining master", ((ZenDiscovery) discovery).joiningCluster()); + } + }, + 30, + TimeUnit.SECONDS ); } } diff --git a/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java index 2a38942eb5a22..b8b29dc722dfe 100644 --- a/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java @@ -328,7 +328,7 @@ public void run() { entry = simulations.get(nodeId); if (entry == null) { // we are simulating a master node switch, wait for it to not be null - awaitBusy(() -> simulations.containsKey(nodeId)); + assertBusy(() -> assertTrue(simulations.containsKey(nodeId))); } assert entry != null; entry.executeLatch.await(); diff --git a/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java index fba9a005c539d..29ad07bae2fa0 100644 --- a/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java @@ -68,13 +68,15 @@ public void testQuorumRecovery() throws Exception { @Override public void doAfterNodes(int numNodes, final Client activeClient) throws Exception { if (numNodes == 1) { - assertTrue(awaitBusy(() -> { + assertBusy(() -> { logger.info("--> running cluster_health (wait for the shards to startup)"); ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest() .waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet(); logger.info("--> done cluster_health, status {}", clusterHealth.getStatus()); - return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW; - }, 30, TimeUnit.SECONDS)); + assertFalse(clusterHealth.isTimedOut()); + assertEquals(ClusterHealthStatus.YELLOW, clusterHealth.getStatus()); + }, 30, TimeUnit.SECONDS); + logger.info("--> one node is closed -- index 1 document into the remaining nodes"); activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3") .endObject()).get(); diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index afa74512e316a..0bec748072251 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -71,7 +71,7 @@ public static CompressedXContent filter(QueryBuilder filterBuilder) throws IOExc return new CompressedXContent(Strings.toString(builder)); } - public void testBaseAsyncTask() throws InterruptedException, IOException { + public void testBaseAsyncTask() throws Exception { IndexService indexService = createIndex("test", Settings.EMPTY); AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); @@ -127,7 +127,7 @@ protected void runInternal() { // now close the index final Index index = indexService.index(); assertAcked(client().admin().indices().prepareClose(index.getName())); - awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index))); final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(indexService, closedIndexService); @@ -137,7 +137,7 @@ protected void runInternal() { // now reopen the index assertAcked(client().admin().indices().prepareOpen(index.getName())); - awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index))); indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(closedIndexService, indexService); @@ -205,7 +205,7 @@ public void testRefreshTaskIsUpdated() throws Exception { // now close the index final Index index = indexService.index(); assertAcked(client().admin().indices().prepareClose(index.getName())); - awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index))); final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(indexService, closedIndexService); @@ -216,7 +216,7 @@ public void testRefreshTaskIsUpdated() throws Exception { // now reopen the index assertAcked(client().admin().indices().prepareOpen(index.getName())); - awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index))); indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(closedIndexService, indexService); refreshTask = indexService.getRefreshTask(); @@ -242,7 +242,7 @@ public void testFsyncTaskIsRunning() throws Exception { // now close the index final Index index = indexService.index(); assertAcked(client().admin().indices().prepareClose(index.getName())); - awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index))); final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(indexService, closedIndexService); @@ -253,7 +253,7 @@ public void testFsyncTaskIsRunning() throws Exception { // now reopen the index assertAcked(client().admin().indices().prepareOpen(index.getName())); - awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index)); + assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index))); indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index); assertNotSame(closedIndexService, indexService); fsyncTask = indexService.getFsyncTask(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java index a072ca880e01e..328efd8174615 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java @@ -27,19 +27,18 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.hamcrest.Matchers; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE) public class InternalEngineMergeIT extends ESIntegTestCase { - public void testMergesHappening() throws InterruptedException, IOException, ExecutionException { + public void testMergesHappening() throws Exception { final int numOfShards = randomIntBetween(1, 5); // some settings to keep num segments low assertAcked(prepareCreate("test").setSettings(Settings.builder() @@ -66,21 +65,24 @@ public void testMergesHappening() throws InterruptedException, IOException, Exec stats.getPrimaries().getMerge().getCurrent()); } final long upperNumberSegments = 2 * numOfShards * 10; - awaitBusy(() -> { + + assertBusy(() -> { IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get(); logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent()); long current = stats.getPrimaries().getMerge().getCurrent(); long count = stats.getPrimaries().getSegments().getCount(); - return count < upperNumberSegments && current == 0; + assertThat(count, lessThan(upperNumberSegments)); + assertThat(current, equalTo(0L)); }); + IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).setMerge(true).get(); logger.info("numshards {}, segments {}, total merges {}, current merge {}", numOfShards, stats.getPrimaries().getSegments().getCount(), stats.getPrimaries().getMerge().getTotal(), stats.getPrimaries().getMerge().getCurrent()); long count = stats.getPrimaries().getSegments().getCount(); - assertThat(count, Matchers.lessThanOrEqualTo(upperNumberSegments)); + assertThat(count, lessThanOrEqualTo(upperNumberSegments)); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 76ca0c66a372d..b0ed3361bcea3 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -619,7 +619,7 @@ public void indexTranslogOperations( phaseTwoStartLatch.countDown(); // wait for the translog phase to complete and the recovery to block global checkpoint advancement - awaitBusy(() -> shards.getPrimary().pendingInSync()); + assertBusy(() -> assertTrue(shards.getPrimary().pendingInSync())); { shards.index(new IndexRequest(index.getName(), "type", "last").source("{}", XContentType.JSON)); final long expectedDocs = docs + 3L; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index e7d68baf26599..ccd150f33ed14 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -184,7 +184,7 @@ public void testUpdateGlobalCheckpointOnReplica() { assertThat(updatedGlobalCheckpoint.get(), equalTo(update)); } - public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, InterruptedException { + public void testMarkAllocationIdAsInSync() throws Exception { final long initialClusterStateVersion = randomNonNegativeLong(); Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); Set active = new HashSet<>(activeWithCheckpoints.keySet()); @@ -212,7 +212,7 @@ public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, Interr }); thread.start(); barrier.await(); - awaitBusy(tracker::pendingInSync); + assertBusy(() -> assertTrue(tracker.pendingInSync())); final long updatedLocalCheckpoint = randomLongBetween(1 + localCheckpoint, Long.MAX_VALUE); // there is a shard copy pending in sync, the global checkpoint can not advance updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 416e717099023..24809e5753564 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -495,13 +495,14 @@ public void onFailure(Exception e) { * permits to the semaphore. We wait here until all generic threads are idle as an indication that all permits have been returned to * the semaphore. */ - awaitBusy(() -> { + assertBusy(() -> { for (final ThreadPoolStats.Stats stats : threadPool.stats()) { if (ThreadPool.Names.GENERIC.equals(stats.getName())) { - return stats.getActive() == 0; + assertThat("Expected no active threads in GENERIC pool", stats.getActive(), equalTo(0)); + return; } } - return false; + fail("Failed to find stats for the GENERIC thread pool"); }); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4120a79b48aa9..fc1bec7eb60f6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3363,7 +3363,7 @@ public void testIsSearchIdle() throws Exception { closeShards(primary); } - public void testScheduledRefresh() throws IOException, InterruptedException { + public void testScheduledRefresh() throws Exception { Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -3388,7 +3388,7 @@ public void testScheduledRefresh() throws IOException, InterruptedException { assertFalse(primary.scheduledRefresh()); assertEquals(lastSearchAccess, primary.getLastSearcherAccess()); // wait until the thread-pool has moved the timestamp otherwise we can't assert on this below - awaitBusy(() -> primary.getThreadPool().relativeTimeInMillis() > lastSearchAccess); + assertBusy(() -> assertThat(primary.getThreadPool().relativeTimeInMillis(), greaterThan(lastSearchAccess))); CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { primary.awaitShardSearchActive(refreshed -> { diff --git a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java index a9ac976d45ae8..74d72edf71f31 100644 --- a/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/test/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -282,17 +282,20 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); client().admin().cluster().prepareReroute().get(); - boolean didClusterTurnRed = awaitBusy(() -> { + boolean didClusterTurnRed = waitUntil(() -> { ClusterHealthStatus test = client().admin().cluster() .health(Requests.clusterHealthRequest("test")).actionGet().getStatus(); return test == ClusterHealthStatus.RED; }, 5, TimeUnit.MINUTES);// sometimes on slow nodes the replication / recovery is just dead slow + final ClusterHealthResponse response = client().admin().cluster() .health(Requests.clusterHealthRequest("test")).get(); + if (response.getStatus() != ClusterHealthStatus.RED) { logger.info("Cluster turned red in busy loop: {}", didClusterTurnRed); logger.info("cluster state:\n{}\n{}", - client().admin().cluster().prepareState().get().getState(), client().admin().cluster().preparePendingClusterTasks().get()); + client().admin().cluster().prepareState().get().getState(), + client().admin().cluster().preparePendingClusterTasks().get()); } assertThat(response.getStatus(), is(ClusterHealthStatus.RED)); ClusterState state = client().admin().cluster().prepareState().get().getState(); diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java index cb0206144e86c..5d95bb36e9e0c 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -51,7 +52,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BooleanSupplier; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -224,25 +224,23 @@ public void testIndexStateShardChanged() throws Throwable { private static void assertShardStatesMatch(final IndexShardStateChangeListener stateChangeListener, final int numShards, final IndexShardState... shardStates) - throws InterruptedException { + throws Exception { + CheckedRunnable waitPredicate = () -> { + assertEquals(stateChangeListener.shardStates.size(), numShards); - BooleanSupplier waitPredicate = () -> { - if (stateChangeListener.shardStates.size() != numShards) { - return false; - } for (List indexShardStates : stateChangeListener.shardStates.values()) { - if (indexShardStates == null || indexShardStates.size() != shardStates.length) { - return false; - } + assertNotNull(indexShardStates); + assertThat(indexShardStates.size(), equalTo(shardStates.length)); + for (int i = 0; i < shardStates.length; i++) { - if (indexShardStates.get(i) != shardStates[i]) { - return false; - } + assertThat(indexShardStates.get(i), equalTo(shardStates[i])); } } - return true; }; - if (!awaitBusy(waitPredicate, 1, TimeUnit.MINUTES)) { + + try { + assertBusy(waitPredicate, 1, TimeUnit.MINUTES); + } catch (AssertionError ae) { fail("failed to observe expect shard states\n" + "expected: [" + numShards + "] shards with states: " + Strings.arrayToCommaDelimitedString(shardStates) + "\n" + "observed:\n" + stateChangeListener); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index e7277450e6b3f..d3110461e37a2 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -135,6 +135,7 @@ import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.empty; @ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class IndexRecoveryIT extends ESIntegTestCase { @@ -789,9 +790,11 @@ public void sendRequest(Transport.Connection connection, long requestId, String if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action) && count.incrementAndGet() == 1) { // ensures that it's considered as valid recovery attempt by source try { - awaitBusy(() -> client(blueNodeName).admin().cluster().prepareState().setLocal(true).get() - .getState().getRoutingTable().index("test").shard(0).getAllInitializingShards().isEmpty() == false); - } catch (InterruptedException e) { + assertBusy(() -> assertThat( + "Expected there to be some initializing shards", + client(blueNodeName).admin().cluster().prepareState().setLocal(true).get() + .getState().getRoutingTable().index("test").shard(0).getAllInitializingShards(), not(empty()))); + } catch (Exception e) { throw new RuntimeException(e); } connection.sendRequest(requestId, action, request, options); diff --git a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java index f388e49b31fb2..9ae87b6835004 100644 --- a/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java +++ b/server/src/test/java/org/elasticsearch/indices/store/IndicesStoreIntegrationIT.java @@ -145,8 +145,8 @@ public void testIndexCleanup() throws Exception { .get(); assertThat(clusterHealth.isTimedOut(), equalTo(false)); - assertThat(waitForShardDeletion(node_1, index, 0), equalTo(false)); - assertThat(waitForIndexDeletion(node_1, index), equalTo(false)); + assertShardDeleted(node_1, index, 0); + assertIndexDeleted(node_1, index); assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(true)); assertThat(Files.exists(indexDirectory(node_2, index)), equalTo(true)); assertThat(Files.exists(shardDirectory(node_3, index, 0)), equalTo(true)); @@ -240,12 +240,13 @@ public void testShardCleanupIfShardDeletionAfterRelocationFailedAndIndexDeleted( // it must still delete the shard, even if it cannot find it anymore in indicesservice client().admin().indices().prepareDelete("test").get(); - assertThat(waitForShardDeletion(node_1, index, 0), equalTo(false)); - assertThat(waitForIndexDeletion(node_1, index), equalTo(false)); + assertShardDeleted(node_1, index, 0); + assertIndexDeleted(node_1, index); assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(false)); assertThat(Files.exists(indexDirectory(node_1, index)), equalTo(false)); - assertThat(waitForShardDeletion(node_2, index, 0), equalTo(false)); - assertThat(waitForIndexDeletion(node_2, index), equalTo(false)); + + assertShardDeleted(node_2, index, 0); + assertIndexDeleted(node_2, index); assertThat(Files.exists(shardDirectory(node_2, index, 0)), equalTo(false)); assertThat(Files.exists(indexDirectory(node_2, index)), equalTo(false)); } @@ -277,7 +278,7 @@ public void testShardsCleanup() throws Exception { assertThat(clusterHealth.isTimedOut(), equalTo(false)); logger.info("--> making sure that shard is not allocated on server3"); - assertThat(waitForShardDeletion(node_3, index, 0), equalTo(false)); + assertShardDeleted(node_3, index, 0); Path server2Shard = shardDirectory(node_2, index, 0); logger.info("--> stopping node {}", node_2); @@ -308,7 +309,7 @@ public void testShardsCleanup() throws Exception { logger.info("--> making sure that shard and its replica are allocated on server1 and server3 but not on server2"); assertThat(Files.exists(shardDirectory(node_1, index, 0)), equalTo(true)); assertThat(Files.exists(shardDirectory(node_3, index, 0)), equalTo(true)); - assertThat(waitForShardDeletion(node_4, index, 0), equalTo(false)); + assertShardDeleted(node_4, index, 0); } public void testShardActiveElsewhereDoesNotDeleteAnother() throws Exception { @@ -453,7 +454,7 @@ public void onFailure(String source, Exception e) { waitNoPendingTasksOnAll(); logger.info("Checking if shards aren't removed"); for (int shard : node2Shards) { - assertTrue(waitForShardDeletion(nonMasterNode, index, shard)); + assertShardExists(nonMasterNode, index, shard); } } @@ -471,13 +472,18 @@ private Path shardDirectory(String server, Index index, int shard) { return paths[0]; } - private boolean waitForShardDeletion(final String server, final Index index, final int shard) throws InterruptedException { - awaitBusy(() -> !Files.exists(shardDirectory(server, index, shard))); - return Files.exists(shardDirectory(server, index, shard)); + private void assertShardDeleted(final String server, final Index index, final int shard) throws Exception { + final Path path = shardDirectory(server, index, shard); + assertBusy(() -> assertFalse("Expected shard to not exist: " + path, Files.exists(path))); + } + + private void assertShardExists(final String server, final Index index, final int shard) throws Exception { + final Path path = shardDirectory(server, index, shard); + assertBusy(() -> assertTrue("Expected shard to exist: " + path, Files.exists(path))); } - private boolean waitForIndexDeletion(final String server, final Index index) throws InterruptedException { - awaitBusy(() -> !Files.exists(indexDirectory(server, index))); - return Files.exists(indexDirectory(server, index)); + private void assertIndexDeleted(final String server, final Index index) throws Exception { + final Path path = indexDirectory(server, index); + assertBusy(() -> assertFalse("Expected index to be deleted: " + path, Files.exists(path))); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index b92b3bb764399..0e02a2c52c13c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -75,7 +75,7 @@ import static java.util.Objects.requireNonNull; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -import static org.elasticsearch.test.ESTestCase.awaitBusy; +import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -335,10 +335,15 @@ protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Pe AtomicInteger phase = new AtomicInteger(); while (true) { // wait for something to happen - assertTrue(awaitBusy(() -> testTask.isCancelled() || - testTask.getOperation() != null || - clusterService.lifecycleState() != Lifecycle.State.STARTED, // speedup finishing on closed nodes - 45, TimeUnit.SECONDS)); // This can take a while during large cluster restart + try { + assertBusy(() -> assertTrue(testTask.isCancelled() || + testTask.getOperation() != null || + clusterService.lifecycleState() != Lifecycle.State.STARTED), // speedup finishing on closed nodes + 45, TimeUnit.SECONDS); // This can take a while during large cluster restart + } catch (Exception ex) { + throw new RuntimeException(ex); + } + if (clusterService.lifecycleState() != Lifecycle.State.STARTED) { return; } diff --git a/server/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java b/server/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java index 9e5a170b6b334..aca6b1184f7b4 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RecoveryWhileUnderLoadIT.java @@ -362,23 +362,23 @@ private void iterateAssertCount(final int numberOfShards, final int iterations, //if there was an error we try to wait and see if at some point it'll get fixed logger.info("--> trying to wait"); - assertTrue(awaitBusy(() -> { - boolean errorOccurred = false; - for (int i = 0; i < iterations; i++) { - SearchResponse searchResponse = client().prepareSearch() - .setTrackTotalHits(true) - .setSize(0) - .setQuery(matchAllQuery()) - .get(); - if (searchResponse.getHits().getTotalHits().value != numberOfDocs) { - errorOccurred = true; - } - } - return !errorOccurred; - }, - 5, - TimeUnit.MINUTES - ) + assertBusy( + () -> { + boolean errorOccurred = false; + for (int i = 0; i < iterations; i++) { + SearchResponse searchResponse = client().prepareSearch() + .setTrackTotalHits(true) + .setSize(0) + .setQuery(matchAllQuery()) + .get(); + if (searchResponse.getHits().getTotalHits().value != numberOfDocs) { + errorOccurred = true; + } + } + assertFalse("An error occurred while waiting", errorOccurred); + }, + 5, + TimeUnit.MINUTES ); assertEquals(numberOfDocs, ids.size()); } diff --git a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java index 81bd844c8fbf6..6da30714c7509 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/server/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -272,7 +272,7 @@ public Map, Object>> pluginScripts() { LogManager.getLogger(SearchCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id")); hits.incrementAndGet(); try { - awaitBusy(() -> shouldBlock.get() == false); + assertBusy(() -> assertFalse(shouldBlock.get())); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/server/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java b/server/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java index 817e922da0cde..d831bce704c54 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java +++ b/server/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java @@ -41,7 +41,7 @@ public Settings nodeSettings(int nodeOrdinal) { .build(); } - public void testOpenContextsAfterRejections() throws InterruptedException { + public void testOpenContextsAfterRejections() throws Exception { createIndex("test"); ensureGreen("test"); final int docs = scaledRandomIntBetween(20, 50); @@ -68,10 +68,8 @@ public void testOpenContextsAfterRejections() throws InterruptedException { } catch (Exception t) { } } - awaitBusy( - () -> client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts() == 0, - 1, TimeUnit.SECONDS); - indicesStats = client().admin().indices().prepareStats().get(); - assertThat(indicesStats.getTotal().getSearch().getOpenContexts(), equalTo(0L)); + assertBusy( + () -> assertThat(client().admin().indices().prepareStats().get().getTotal().getSearch().getOpenContexts(), equalTo(0L)), + 1, TimeUnit.SECONDS); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 2e7ef6b382336..6663ea68a97bb 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -208,17 +208,17 @@ public static void unblockAllDataNodes(String repository) { } public void waitForBlockOnAnyDataNode(String repository, TimeValue timeout) throws InterruptedException { - if (false == awaitBusy(() -> { - for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + final boolean blocked = waitUntil(() -> { + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); if (mockRepository.blocked()) { return true; } } return false; - }, timeout.millis(), TimeUnit.MILLISECONDS)) { - fail("Timeout waiting for repository block on any data node!!!"); - } + }, timeout.millis(), TimeUnit.MILLISECONDS); + + assertTrue("No repository is blocked waiting on a data node", blocked); } public static void unblockNode(final String repository, final String node) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index be3f24f41c4a2..0caf08bb3fd72 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1214,7 +1214,7 @@ public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Except logger.info("--> wait for the index to appear"); // that would mean that recovery process started and failing - assertThat(waitForIndex("test-idx", TimeValue.timeValueSeconds(10)), equalTo(true)); + waitForIndex("test-idx", TimeValue.timeValueSeconds(10)); logger.info("--> delete index"); cluster().wipeIndices("test-idx"); @@ -2095,6 +2095,7 @@ public void testSnapshotStatus() throws Exception { assertEquals("test-snap", response.getSnapshots().get(0).getSnapshot().getSnapshotId().getName()); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/46276") public void testSnapshotRelocatingPrimary() throws Exception { Client client = client(); logger.info("--> creating repository"); @@ -2619,9 +2620,14 @@ public void testDeleteSnapshotWhileRestoringFails() throws Exception { restoreFut.get(); } - private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException { - return awaitBusy(() -> client().admin().indices().prepareExists(index).execute().actionGet().isExists(), - timeout.millis(), TimeUnit.MILLISECONDS); + private void waitForIndex(final String index, TimeValue timeout) throws Exception { + assertBusy( + () -> { + boolean exists = client().admin().indices().prepareExists(index).execute().actionGet().isExists(); + assertTrue("Expected index [" + index + "] to exist", exists); + }, + timeout.millis(), + TimeUnit.MILLISECONDS); } public void testSnapshotName() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java index 785552124ea26..eb52b6334a7f9 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScheduleWithFixedDelayTests.java @@ -178,7 +178,7 @@ public void testCancellingRunnable() throws Exception { // rarely wait and make sure the runnable didn't run at the next interval if (rarely()) { - assertFalse(awaitBusy(runAfterDone::get, 1L, TimeUnit.SECONDS)); + assertBusy(() -> assertFalse("Runnable was run after being cancelled", runAfterDone.get()), 1L, TimeUnit.SECONDS); } } @@ -283,10 +283,10 @@ public void testRunnableDoesNotRunAfterCancellation() throws Exception { assertThat(counterValue, equalTo(iterations)); if (rarely()) { - awaitBusy(() -> { - final int value = counter.get(); - return value == iterations; - }, 5 * interval.millis(), TimeUnit.MILLISECONDS); + assertBusy( + () -> assertThat(counter.get(), equalTo(iterations)), + 5 * interval.millis(), + TimeUnit.MILLISECONDS); } } diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java index 586303c23c4b8..8a0fbe0a4958a 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java @@ -21,7 +21,6 @@ import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.SeedUtils; - import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountables; import org.apache.lucene.util.BytesRef; @@ -29,7 +28,6 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.test.ESTestCase; import java.util.Collection; import java.util.Collections; @@ -41,6 +39,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.test.ESTestCase.assertBusy; +import static org.junit.Assert.assertTrue; + public class MockBigArrays extends BigArrays { /** @@ -57,8 +58,9 @@ public static void ensureAllArraysAreReleased() throws Exception { // not empty, we might be executing on a shared cluster that keeps on obtaining // and releasing arrays, lets make sure that after a reasonable timeout, all master // copy (snapshot) have been released - boolean success = ESTestCase.awaitBusy(() -> Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_ARRAYS.keySet())); - if (!success) { + try { + assertBusy(() -> assertTrue(Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_ARRAYS.keySet()))); + } catch (AssertionError ex) { masterCopy.keySet().retainAll(ACQUIRED_ARRAYS.keySet()); ACQUIRED_ARRAYS.keySet().removeAll(masterCopy.keySet()); // remove all existing master copy we will report on if (!masterCopy.isEmpty()) { diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java b/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java index c202688892963..1bb432360313a 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/MockPageCacheRecycler.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.recycler.Recycler.V; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.test.ESTestCase; import java.lang.reflect.Array; import java.util.Arrays; @@ -34,6 +33,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static org.elasticsearch.test.ESTestCase.waitUntil; + public class MockPageCacheRecycler extends PageCacheRecycler { private static final ConcurrentMap ACQUIRED_PAGES = new ConcurrentHashMap<>(); @@ -44,8 +45,8 @@ public static void ensureAllPagesAreReleased() throws Exception { // not empty, we might be executing on a shared cluster that keeps on obtaining // and releasing pages, lets make sure that after a reasonable timeout, all master // copy (snapshot) have been released - boolean success = - ESTestCase.awaitBusy(() -> Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_PAGES.keySet())); + final boolean success = + waitUntil(() -> Sets.haveEmptyIntersection(masterCopy.keySet(), ACQUIRED_PAGES.keySet())); if (!success) { masterCopy.keySet().retainAll(ACQUIRED_PAGES.keySet()); ACQUIRED_PAGES.keySet().removeAll(masterCopy.keySet()); // remove all existing master copy we will report on diff --git a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java index d46c09e12621d..e1fa74c5b3176 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java @@ -1,4 +1,4 @@ -package org.elasticsearch.test;/* +/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright @@ -17,6 +17,8 @@ * under the License. */ +package org.elasticsearch.test; + import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomStrings; diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 65ef8c53f0521..6f0f5566bd6ad 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -176,8 +176,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -200,6 +198,7 @@ import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.startsWith; @@ -967,67 +966,50 @@ public ClusterHealthStatus waitForRelocation(ClusterHealthStatus status) { * Waits until at least a give number of document is visible for searchers * * @param numDocs number of documents to wait for - * @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. If supplied it will be first checked for documents indexed. + * @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. It will be first checked for documents indexed. * This saves on unneeded searches. - * @return the actual number of docs seen. */ - public long waitForDocs(final long numDocs, @Nullable final BackgroundIndexer indexer) throws InterruptedException { + public void waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws Exception { // indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED. - return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer); - } + final long maxWaitTimeMs = Math.max(90 * 1000, 200 * numDocs); - /** - * Waits until at least a give number of document is visible for searchers - * - * @param numDocs number of documents to wait for - * @param maxWaitTime if not progress have been made during this time, fail the test - * @param maxWaitTimeUnit the unit in which maxWaitTime is specified - * @param indexer If supplied it will be first checked for documents indexed. - * This saves on unneeded searches. - * @return the actual number of docs seen. - */ - public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, @Nullable final BackgroundIndexer indexer) - throws InterruptedException { - final AtomicLong lastKnownCount = new AtomicLong(-1); - long lastStartCount = -1; - BooleanSupplier testDocs = () -> { - if (indexer != null) { - lastKnownCount.set(indexer.totalIndexedDocs()); - } - if (lastKnownCount.get() >= numDocs) { - try { - - long count = client().prepareSearch() - .setTrackTotalHits(true) - .setSize(0) - .setQuery(matchAllQuery()) - .get() - .getHits().getTotalHits().value; - - if (count == lastKnownCount.get()) { - // no progress - try to refresh for the next time - client().admin().indices().prepareRefresh().get(); + assertBusy( + () -> { + long lastKnownCount = indexer.totalIndexedDocs(); + + if (lastKnownCount >= numDocs) { + try { + long count = client().prepareSearch() + .setTrackTotalHits(true) + .setSize(0) + .setQuery(matchAllQuery()) + .get() + .getHits().getTotalHits().value; + + if (count == lastKnownCount) { + // no progress - try to refresh for the next time + client().admin().indices().prepareRefresh().get(); + } + lastKnownCount = count; + } catch (Exception e) { // count now acts like search and barfs if all shards failed... + logger.debug("failed to executed count", e); + throw e; } - lastKnownCount.set(count); - } catch (Exception e) { // count now acts like search and barfs if all shards failed... - logger.debug("failed to executed count", e); - return false; } - logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); - } else { - logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); - } - return lastKnownCount.get() >= numDocs; - }; - while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { - if (lastStartCount == lastKnownCount.get()) { - // we didn't make any progress - fail("failed to reach " + numDocs + "docs"); - } - lastStartCount = lastKnownCount.get(); - } - return lastKnownCount.get(); + if (logger.isDebugEnabled()) { + if (lastKnownCount < numDocs) { + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount, numDocs); + } else { + logger.debug("[{}] docs visible for search (needed [{}])", lastKnownCount, numDocs); + } + } + + assertThat(lastKnownCount, greaterThanOrEqualTo(numDocs)); + }, + maxWaitTimeMs, + TimeUnit.MILLISECONDS + ); } /** diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index b783e340beda2..7ccdb0c36bf95 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -871,6 +871,7 @@ public static void assertBusy(CheckedRunnable codeBlock) throws Excep */ public static void assertBusy(CheckedRunnable codeBlock, long maxWaitTime, TimeUnit unit) throws Exception { long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit); + // In case you've forgotten your high-school studies, log10(x) / log10(y) == log y(x) long iterations = Math.max(Math.round(Math.log10(maxTimeInMillis) / Math.log10(2)), 1); long timeInMillis = 1; long sum = 0; @@ -898,14 +899,34 @@ public static void assertBusy(CheckedRunnable codeBlock, long maxWait } } - public static boolean awaitBusy(BooleanSupplier breakSupplier) throws InterruptedException { - return awaitBusy(breakSupplier, 10, TimeUnit.SECONDS); + /** + * Periodically execute the supplied function until it returns true, or a timeout + * is reached. This version uses a timeout of 10 seconds. If at all possible, + * use {@link ESTestCase#assertBusy(CheckedRunnable)} instead. + * + * @param breakSupplier determines whether to return immediately or continue waiting. + * @return the last value returned by breakSupplier + * @throws InterruptedException if any sleep calls were interrupted. + */ + public static boolean waitUntil(BooleanSupplier breakSupplier) throws InterruptedException { + return waitUntil(breakSupplier, 10, TimeUnit.SECONDS); } // After 1s, we stop growing the sleep interval exponentially and just sleep 1s until maxWaitTime private static final long AWAIT_BUSY_THRESHOLD = 1000L; - public static boolean awaitBusy(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) throws InterruptedException { + /** + * Periodically execute the supplied function until it returns true, or until the + * specified maximum wait time has elapsed. If at all possible, use + * {@link ESTestCase#assertBusy(CheckedRunnable)} instead. + * + * @param breakSupplier determines whether to return immediately or continue waiting. + * @param maxWaitTime the maximum amount of time to wait + * @param unit the unit of tie for maxWaitTime + * @return the last value returned by breakSupplier + * @throws InterruptedException if any sleep calls were interrupted. + */ + public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) throws InterruptedException { long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit); long timeInMillis = 1; long sum = 0; diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 869b4313bafbc..f5e1535a2a4c5 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -155,13 +155,13 @@ import static org.elasticsearch.discovery.FileBasedSeedHostsProvider.UNICAST_HOSTS_FILE; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; -import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.getTestTransportType; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.assertEquals; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertFalse; @@ -1252,22 +1252,17 @@ private synchronized void validateClusterFormed(String viaNode) { logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes); final Client client = client(viaNode); try { - if (awaitBusy(() -> { + assertBusy(() -> { DiscoveryNodes discoveryNodes = client.admin().cluster().prepareState().get().getState().nodes(); - if (discoveryNodes.getSize() != expectedNodes.size()) { - return false; - } + assertEquals(expectedNodes.size(), discoveryNodes.getSize()); for (DiscoveryNode expectedNode : expectedNodes) { - if (discoveryNodes.nodeExists(expectedNode) == false) { - return false; - } + assertTrue("Expected node to exist: " + expectedNode, discoveryNodes.nodeExists(expectedNode)); } - return true; - }, 30, TimeUnit.SECONDS) == false) { - throw new IllegalStateException("cluster failed to form with expected nodes " + expectedNodes + " and actual nodes " + - client.admin().cluster().prepareState().get().getState().nodes()); - } - } catch (InterruptedException e) { + }, 30, TimeUnit.SECONDS); + } catch (AssertionError ae) { + throw new IllegalStateException("cluster failed to form with expected nodes " + expectedNodes + " and actual nodes " + + client.admin().cluster().prepareState().get().getState().nodes()); + } catch (Exception e) { throw new IllegalStateException(e); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index eba3532f063bf..6d2507c97efe1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -105,8 +105,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -618,62 +616,48 @@ protected void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index fol * @param numDocs number of documents to wait for * @param indexer a {@link org.elasticsearch.test.BackgroundIndexer}. Will be first checked for documents indexed. * This saves on unneeded searches. - * @return the actual number of docs seen. */ - public long waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws InterruptedException { + public void waitForDocs(final long numDocs, final BackgroundIndexer indexer) throws Exception { // indexing threads can wait for up to ~1m before retrying when they first try to index into a shard which is not STARTED. - return waitForDocs(numDocs, 90, TimeUnit.SECONDS, indexer); - } + final long maxWaitTimeMs = Math.max(90 * 1000, 200 * numDocs); + + assertBusy( + () -> { + long lastKnownCount = indexer.totalIndexedDocs(); + + if (lastKnownCount >= numDocs) { + try { + long count = indexer.getClient().prepareSearch() + .setTrackTotalHits(true) + .setSize(0) + .setQuery(QueryBuilders.matchAllQuery()) + .get() + .getHits().getTotalHits().value; + + if (count == lastKnownCount) { + // no progress - try to refresh for the next time + indexer.getClient().admin().indices().prepareRefresh().get(); + } + lastKnownCount = count; + } catch (Exception e) { // count now acts like search and barfs if all shards failed... + logger.debug("failed to executed count", e); + throw e; + } + } - /** - * Waits until at least a give number of document is visible for searchers - * - * @param numDocs number of documents to wait for - * @param maxWaitTime if not progress have been made during this time, fail the test - * @param maxWaitTimeUnit the unit in which maxWaitTime is specified - * @param indexer Will be first checked for documents indexed. - * This saves on unneeded searches. - * @return the actual number of docs seen. - */ - public long waitForDocs(final long numDocs, int maxWaitTime, TimeUnit maxWaitTimeUnit, final BackgroundIndexer indexer) - throws InterruptedException { - final AtomicLong lastKnownCount = new AtomicLong(-1); - long lastStartCount = -1; - BooleanSupplier testDocs = () -> { - lastKnownCount.set(indexer.totalIndexedDocs()); - if (lastKnownCount.get() >= numDocs) { - try { - long count = indexer.getClient().prepareSearch() - .setTrackTotalHits(true) - .setSize(0) - .setQuery(QueryBuilders.matchAllQuery()) - .get() - .getHits().getTotalHits().value; - - if (count == lastKnownCount.get()) { - // no progress - try to refresh for the next time - indexer.getClient().admin().indices().prepareRefresh().get(); + if (logger.isDebugEnabled()) { + if (lastKnownCount < numDocs) { + logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount, numDocs); + } else { + logger.debug("[{}] docs visible for search (needed [{}])", lastKnownCount, numDocs); } - lastKnownCount.set(count); - } catch (Exception e) { // count now acts like search and barfs if all shards failed... - logger.debug("failed to executed count", e); - return false; } - logger.debug("[{}] docs visible for search. waiting for [{}]", lastKnownCount.get(), numDocs); - } else { - logger.debug("[{}] docs indexed. waiting for [{}]", lastKnownCount.get(), numDocs); - } - return lastKnownCount.get() >= numDocs; - }; - while (!awaitBusy(testDocs, maxWaitTime, maxWaitTimeUnit)) { - if (lastStartCount == lastKnownCount.get()) { - // we didn't make any progress - fail("failed to reach " + numDocs + "docs"); - } - lastStartCount = lastKnownCount.get(); - } - return lastKnownCount.get(); + assertThat(lastKnownCount, greaterThanOrEqualTo(numDocs)); + }, + maxWaitTimeMs, + TimeUnit.MILLISECONDS + ); } protected ActionListener waitForRestore( diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicensesIntegrationTestCase.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicensesIntegrationTestCase.java index 9696ca6e7fde7..80697d254f8a8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicensesIntegrationTestCase.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/AbstractLicensesIntegrationTestCase.java @@ -97,16 +97,15 @@ public void onFailure(String source, @Nullable Exception e) { latch.await(); } - protected void assertLicenseActive(boolean active) throws InterruptedException { - boolean success = awaitBusy(() -> { + protected void assertLicenseActive(boolean active) throws Exception { + assertBusy(() -> { for (XPackLicenseState licenseState : internalCluster().getDataNodeInstances(XPackLicenseState.class)) { if (licenseState.isActive() == active) { - return true; + return; } } - return false; + fail("No data nodes have a license active state of [" + active + "]"); }); - assertTrue(success); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseServiceClusterTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseServiceClusterTests.java index 00d1c47cdedaa..2e79073af2c8d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseServiceClusterTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseServiceClusterTests.java @@ -174,16 +174,15 @@ public void testClusterRestartWithOldSignature() throws Exception { assertLicenseActive(true); } - private void assertOperationMode(License.OperationMode operationMode) throws InterruptedException { - boolean success = awaitBusy(() -> { + private void assertOperationMode(License.OperationMode operationMode) throws Exception { + assertBusy(() -> { for (XPackLicenseState licenseState : internalCluster().getDataNodeInstances(XPackLicenseState.class)) { if (licenseState.getOperationMode() == operationMode) { - return true; + return; } } - return false; + fail("No data nodes found with operation mode [" + operationMode + "]"); }); - assertTrue(success); } private void writeCloudInternalMode(String mode) throws Exception { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java index 423ae1297a5ab..487bf5a7a434e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java @@ -331,9 +331,10 @@ public void testStateMachine() throws Exception { assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); - assertTrue(awaitBusy(() -> indexer.getPosition() == 2)); + assertBusy(() -> assertThat(indexer.getPosition(), equalTo(2))); + countDownLatch.countDown(); - assertTrue(awaitBusy(() -> isFinished.get())); + assertBusy(() -> assertTrue(isFinished.get())); assertThat(indexer.getPosition(), equalTo(3)); assertFalse(isStopped.get()); @@ -347,24 +348,24 @@ public void testStateMachine() throws Exception { } } - public void testStateMachineBrokenSearch() throws InterruptedException { + public void testStateMachineBrokenSearch() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); - try { + try { MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2); indexer.start(); + assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS)); + assertBusy(() -> assertTrue(isFinished.get()), 10000, TimeUnit.SECONDS); assertThat(indexer.getStep(), equalTo(3)); - } finally { executor.shutdownNow(); } } - public void testStop_WhileIndexing() throws InterruptedException { + public void testStop_WhileIndexing() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { @@ -378,14 +379,14 @@ public void testStop_WhileIndexing() throws InterruptedException { countDownLatch.countDown(); assertThat(indexer.getPosition(), equalTo(2)); - assertTrue(awaitBusy(() -> isStopped.get())); + assertBusy(() -> assertTrue(isStopped.get())); assertFalse(isFinished.get()); } finally { executor.shutdownNow(); } } - public void testFiveRuns() throws InterruptedException { + public void testFiveRuns() throws Exception { AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); final ExecutorService executor = Executors.newFixedThreadPool(1); try { @@ -393,7 +394,7 @@ public void testFiveRuns() throws InterruptedException { indexer.start(); assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); - assertTrue(awaitBusy(() -> isFinished.get())); + assertBusy(() -> assertTrue(isFinished.get())); indexer.assertCounters(); } finally { executor.shutdownNow(); diff --git a/x-pack/plugin/ilm/qa/with-security/src/test/java/org/elasticsearch/xpack/security/PermissionsIT.java b/x-pack/plugin/ilm/qa/with-security/src/test/java/org/elasticsearch/xpack/security/PermissionsIT.java index 62c140bce7951..b773f0288a4d4 100644 --- a/x-pack/plugin/ilm/qa/with-security/src/test/java/org/elasticsearch/xpack/security/PermissionsIT.java +++ b/x-pack/plugin/ilm/qa/with-security/src/test/java/org/elasticsearch/xpack/security/PermissionsIT.java @@ -245,7 +245,7 @@ public void testCanViewExplainOnUnmanagedIndex() throws Exception { */ @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/41440") public void testWhenUserLimitedByOnlyAliasOfIndexCanWriteToIndexWhichWasRolledoverByILMPolicy() - throws IOException, InterruptedException { + throws Exception { /* * Setup: * - ILM policy to rollover index when max docs condition is met @@ -264,33 +264,24 @@ public void testWhenUserLimitedByOnlyAliasOfIndexCanWriteToIndexWhichWasRolledov refresh("foo_alias"); // wait so the ILM policy triggers rollover action, verify that the new index exists - assertThat(awaitBusy(() -> { + assertBusy(() -> { Request request = new Request("HEAD", "/" + "foo-logs-000002"); - int status; - try { - status = adminClient().performRequest(request).getStatusLine().getStatusCode(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return status == 200; - }), is(true)); + int status = adminClient().performRequest(request).getStatusLine().getStatusCode(); + assertThat(status, equalTo(200)); + }); // test_user: index docs using alias, now should be able write to new index indexDocs("test_user", "x-pack-test-password", "foo_alias", 1); refresh("foo_alias"); // verify that the doc has been indexed into new write index - awaitBusy(() -> { + assertBusy(() -> { Request request = new Request("GET", "/foo-logs-000002/_search"); - Response response; - try { - response = adminClient().performRequest(request); - try (InputStream content = response.getEntity().getContent()) { - Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); - return ((Integer) XContentMapValues.extractValue("hits.total.value", map)) == 1; - } - } catch (IOException e) { - throw new RuntimeException(e); + Response response = adminClient().performRequest(request); + try (InputStream content = response.getEntity().getContent()) { + Map map = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); + Integer totalHits = (Integer) XContentMapValues.extractValue("hits.total.value", map); + assertThat(totalHits, equalTo(1)); } }); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 5753e8acc205b..66fae8e458789 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -168,7 +168,8 @@ public void testDeleteExpiredData() throws Exception { client().admin().indices().prepareRefresh("*").get(); // We need to wait a second to ensure the second time around model snapshots will have a different ID (it depends on epoch seconds) - awaitBusy(() -> false, 1, TimeUnit.SECONDS); + // FIXME it would be better to wait for something concrete instead of wait for time to elapse + assertBusy(() -> {}, 1, TimeUnit.SECONDS); for (Job.Builder job : getJobs()) { // Run up to now diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java index 9512522709521..55211010f97fb 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RevertModelSnapshotIT.java @@ -69,7 +69,7 @@ public void test() throws Exception { assertThat(revertPointBucket.isInterim(), is(true)); // We need to wait a second to ensure the second time around model snapshot will have a different ID (it depends on epoch seconds) - awaitBusy(() -> false, 1, TimeUnit.SECONDS); + waitUntil(() -> false, 1, TimeUnit.SECONDS); openJob(job.getId()); postData(job.getId(), generateData(startTime + 10 * bucketSpan.getMillis(), bucketSpan, 10, Arrays.asList("foo", "bar"), diff --git a/x-pack/plugin/ml/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java b/x-pack/plugin/ml/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java index 454a3eb06e5a7..a70c0ef0af725 100644 --- a/x-pack/plugin/ml/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java +++ b/x-pack/plugin/ml/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/ml/transforms/PainlessDomainSplitIT.java @@ -315,12 +315,12 @@ public void testHRDSplit() throws Exception { client().performRequest(createFeedRequest); client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "datafeeds/hrd-split-datafeed/_start")); - boolean passed = awaitBusy(() -> { - try { + try { + assertBusy(() -> { client().performRequest(new Request("POST", "/_refresh")); Response response = client().performRequest(new Request("GET", - MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/results/records")); + MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/results/records")); String responseBody = EntityUtils.toString(response.getEntity()); if (responseBody.contains("\"count\":2")) { @@ -339,27 +339,19 @@ public void testHRDSplit() throws Exception { // domainSplit() tests had subdomain, testHighestRegisteredDomainCases() do not if (test.subDomainExpected != null) { assertThat("Expected subdomain [" + test.subDomainExpected + "] but found [" + actualSubDomain - + "]. Actual " + actualTotal + " vs Expected " + expectedTotal, actualSubDomain, - equalTo(test.subDomainExpected)); + + "]. Actual " + actualTotal + " vs Expected " + expectedTotal, actualSubDomain, + equalTo(test.subDomainExpected)); } assertThat("Expected domain [" + test.domainExpected + "] but found [" + actualDomain + "]. Actual " - + actualTotal + " vs Expected " + expectedTotal, actualDomain, equalTo(test.domainExpected)); - - return true; + + actualTotal + " vs Expected " + expectedTotal, actualDomain, equalTo(test.domainExpected)); } else { logger.error(responseBody); - return false; + fail("Response body didn't contain [\"count\":2]"); } - - } catch (Exception e) { - logger.error(e.getMessage()); - return false; - } - - }, 5, TimeUnit.SECONDS); - - if (!passed) { + }, 5, TimeUnit.SECONDS); + } catch (Exception e) { + logger.error(e.getMessage()); fail("Anomaly records were not found within 5 seconds"); } } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 1ebc65ef83af9..1710d163e2c59 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -404,7 +404,7 @@ public void onFailure(Exception e) { } @SuppressWarnings("unchecked") - public void testTriggerWithoutHeaders() throws InterruptedException { + public void testTriggerWithoutHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); Client client = mock(Client.class); @@ -475,7 +475,7 @@ public void onFailure(Exception e) { fail("Should not have entered onFailure"); } }); - ESTestCase.awaitBusy(started::get); + assertBusy(() -> assertTrue(started.get())); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); @@ -484,11 +484,11 @@ public void onFailure(Exception e) { latch.countDown(); // Wait for the final persistent status to finish - ESTestCase.awaitBusy(finished::get); + assertBusy(() -> assertTrue(finished.get())); } @SuppressWarnings("unchecked") - public void testTriggerWithHeaders() throws InterruptedException { + public void testTriggerWithHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); Map headers = new HashMap<>(1); headers.put("es-security-runas-user", "foo"); @@ -565,7 +565,7 @@ public void onFailure(Exception e) { fail("Should not have entered onFailure"); } }); - ESTestCase.awaitBusy(started::get); + assertBusy(() -> assertTrue(started.get())); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); @@ -574,11 +574,11 @@ public void onFailure(Exception e) { latch.countDown(); // Wait for the final persistent status to finish - ESTestCase.awaitBusy(finished::get); + assertBusy(() -> assertTrue(finished.get())); } @SuppressWarnings("unchecked") - public void testSaveStateChangesIDScheme() throws InterruptedException { + public void testSaveStateChangesIDScheme() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); Map headers = new HashMap<>(1); headers.put("es-security-runas-user", "foo"); @@ -656,7 +656,7 @@ public void onFailure(Exception e) { fail("Should not have entered onFailure"); } }); - ESTestCase.awaitBusy(started::get); + assertBusy(() -> assertTrue(started.get())); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); @@ -665,7 +665,7 @@ public void onFailure(Exception e) { latch.countDown(); // Wait for the final persistent status to finish - ESTestCase.awaitBusy(finished::get); + assertBusy(() -> assertTrue(finished.get())); } public void testStopWhenStopped() throws InterruptedException { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java index 3cd118dc064fd..c0e7d5ced6ee9 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/license/LicensingTests.java @@ -129,7 +129,7 @@ protected int maxNumberOfNodes() { } @Before - public void resetLicensing() throws InterruptedException { + public void resetLicensing() throws Exception { enableLicensing(OperationMode.MISSING); } @@ -297,64 +297,50 @@ private static void assertElasticsearchSecurityException(ThrowingRunnable runnab assertThat(ee.status(), is(RestStatus.FORBIDDEN)); } - private void disableLicensing() throws InterruptedException { + private void disableLicensing() throws Exception { // This method first makes sure licensing is enabled everywhere so that we can execute // monitoring actions to ensure we have a stable cluster and only then do we disable. - // This is done in an await busy since there is a chance that the enabling of the license + // This is done in an assertBusy since there is a chance that the enabling of the license // is overwritten by some other cluster activity and the node throws an exception while we // wait for things to stabilize! - final boolean success = awaitBusy(() -> { - try { - for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { - if (licenseState.isAuthAllowed() == false) { - enableLicensing(OperationMode.BASIC); - break; - } + assertBusy(() -> { + for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { + if (licenseState.isAuthAllowed() == false) { + enableLicensing(OperationMode.BASIC); + break; } + } - ensureGreen(); - ensureClusterSizeConsistency(); - ensureClusterStateConsistency(); + ensureGreen(); + ensureClusterSizeConsistency(); + ensureClusterStateConsistency(); - // apply the disabling of the license once the cluster is stable - for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { - licenseState.update(OperationMode.BASIC, false, null); - } - } catch (Exception e) { - logger.error("Caught exception while disabling license", e); - return false; + // apply the disabling of the license once the cluster is stable + for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { + licenseState.update(OperationMode.BASIC, false, null); } - return true; }, 30L, TimeUnit.SECONDS); - assertTrue(success); } - private void enableLicensing(License.OperationMode operationMode) throws InterruptedException { + private void enableLicensing(License.OperationMode operationMode) throws Exception { // do this in an await busy since there is a chance that the enabling of the license is // overwritten by some other cluster activity and the node throws an exception while we // wait for things to stabilize! - final boolean success = awaitBusy(() -> { - try { - // first update the license so we can execute monitoring actions - for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { - licenseState.update(operationMode, true, null); - } + assertBusy(() -> { + // first update the license so we can execute monitoring actions + for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { + licenseState.update(operationMode, true, null); + } - ensureGreen(); - ensureClusterSizeConsistency(); - ensureClusterStateConsistency(); + ensureGreen(); + ensureClusterSizeConsistency(); + ensureClusterStateConsistency(); - // re-apply the update in case any node received an updated cluster state that triggered the license state - // to change - for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { - licenseState.update(operationMode, true, null); - } - } catch (Exception e) { - logger.error("Caught exception while enabling license", e); - return false; + // re-apply the update in case any node received an updated cluster state that triggered the license state + // to change + for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) { + licenseState.update(operationMode, true, null); } - return true; }, 30L, TimeUnit.SECONDS); - assertTrue(success); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java index 9e6e496fda37c..27581a82f0452 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyIntegTests.java @@ -80,7 +80,7 @@ public void waitForSecurityIndexWritable() throws Exception { } @After - public void wipeSecurityIndex() throws InterruptedException { + public void wipeSecurityIndex() throws Exception { // get the api key service and wait until api key expiration is not in progress! awaitApiKeysRemoverCompletion(); deleteSecurityIndex(); @@ -111,10 +111,9 @@ public String configUsersRoles() { "manage_own_api_key_role:user_with_manage_own_api_key_role\n"; } - private void awaitApiKeysRemoverCompletion() throws InterruptedException { + private void awaitApiKeysRemoverCompletion() throws Exception { for (ApiKeyService apiKeyService : internalCluster().getInstances(ApiKeyService.class)) { - final boolean done = awaitBusy(() -> apiKeyService.isExpirationInProgress() == false); - assertTrue(done); + assertBusy(() -> assertFalse(apiKeyService.isExpirationInProgress())); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RunAsIntegTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RunAsIntegTests.java index 6d5c6770bf2f5..c114f1ee8182b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RunAsIntegTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/RunAsIntegTests.java @@ -161,9 +161,7 @@ public void testEmptyUserImpersonationHeader() throws Exception { .put(SecurityField.USER_SETTING.getKey(), TRANSPORT_CLIENT_USER + ":" + SecuritySettingsSourceField.TEST_PASSWORD).build())) { //ensure the client can connect - awaitBusy(() -> { - return client.connectedNodes().size() > 0; - }); + assertBusy(() -> assertFalse(client.connectedNodes().isEmpty())); try { Map headers = new HashMap<>(); @@ -193,9 +191,7 @@ public void testNonExistentRunAsUser() throws Exception { .put(SecurityField.USER_SETTING.getKey(), TRANSPORT_CLIENT_USER + ":" + SecuritySettingsSourceField.TEST_PASSWORD).build())) { //ensure the client can connect - awaitBusy(() -> { - return client.connectedNodes().size() > 0; - }); + assertBusy(() -> assertFalse(client.connectedNodes().isEmpty())); try { Map headers = new HashMap<>(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java index 25af5b350397b..07b37c1bf6e70 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/TokenAuthIntegTests.java @@ -558,11 +558,10 @@ public void waitForSecurityIndexWritable() throws Exception { } @After - public void wipeSecurityIndex() throws InterruptedException { + public void wipeSecurityIndex() throws Exception { // get the token service and wait until token expiration is not in progress! for (TokenService tokenService : internalCluster().getInstances(TokenService.class)) { - final boolean done = awaitBusy(() -> tokenService.isExpirationInProgress() == false); - assertTrue(done); + assertBusy(() -> assertFalse(tokenService.isExpirationInProgress())); } super.deleteSecurityIndex(); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java index ff080be728db9..7c80de3ace4fe 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java @@ -320,7 +320,7 @@ public void run() { final List openedSockets = new ArrayList<>(); final List blacklistedAddress = new ArrayList<>(); try { - final boolean allSocketsOpened = awaitBusy(() -> { + final boolean allSocketsOpened = waitUntil(() -> { try { InetAddress[] allAddresses = InetAddressHelper.getAllAddresses(); if (serverAddress instanceof Inet4Address) { @@ -337,10 +337,7 @@ public void run() { final Socket socket = openMockSocket(serverAddress, serverPort, localAddress, portToBind); openedSockets.add(socket); logger.debug("opened socket [{}]", socket); - } catch (NoRouteToHostException e) { - logger.debug(new ParameterizedMessage("blacklisting address [{}] due to:", localAddress), e); - blacklistedAddress.add(localAddress); - } catch (ConnectException e) { + } catch (NoRouteToHostException | ConnectException e) { logger.debug(new ParameterizedMessage("blacklisting address [{}] due to:", localAddress), e); blacklistedAddress.add(localAddress); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java index 486b561a5cbe0..a542464faf2f1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/NativePrivilegeStoreTests.java @@ -283,7 +283,7 @@ public void testPutPrivileges() throws Exception { )); } - awaitBusy(() -> requests.size() > 0, 1, TimeUnit.SECONDS); + assertBusy(() -> assertFalse(requests.isEmpty()), 1, TimeUnit.SECONDS); assertThat(requests, iterableWithSize(1)); assertThat(requests.get(0), instanceOf(ClearRolesCacheRequest.class)); @@ -324,7 +324,7 @@ public void testDeletePrivileges() throws Exception { )); } - awaitBusy(() -> requests.size() > 0, 1, TimeUnit.SECONDS); + assertBusy(() -> assertFalse(requests.isEmpty()), 1, TimeUnit.SECONDS); assertThat(requests, iterableWithSize(1)); assertThat(requests.get(0), instanceOf(ClearRolesCacheRequest.class)); diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index 700d896f25389..3f133ee539e95 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -39,7 +39,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -49,6 +48,7 @@ import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue; import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -213,7 +213,7 @@ private void disableMonitoring() throws Exception { }, () -> "Exception when disabling monitoring"); - awaitBusy(() -> { + assertBusy(() -> { try { ClientYamlTestResponse response = callApi("xpack.usage", singletonMap("filter_path", "monitoring.enabled_exporters"), emptyList(), @@ -222,7 +222,7 @@ private void disableMonitoring() throws Exception { @SuppressWarnings("unchecked") final Map exporters = (Map) response.evaluate("monitoring.enabled_exporters"); if (exporters.isEmpty() == false) { - return false; + fail("Exporters were not found"); } final Map params = new HashMap<>(); @@ -237,7 +237,8 @@ private void disableMonitoring() throws Exception { final Map node = (Map) nodes.values().iterator().next(); final Number activeWrites = (Number) extractValue("thread_pool.write.active", node); - return activeWrites != null && activeWrites.longValue() == 0L; + assertNotNull(activeWrites); + assertThat(activeWrites, equalTo(0)); } catch (Exception e) { throw new ElasticsearchException("Failed to wait for monitoring exporters to stop:", e); } @@ -281,26 +282,15 @@ private void awaitCallApi(String apiName, Map params, List> bodies, CheckedFunction success, - Supplier error) throws Exception { - - AtomicReference exceptionHolder = new AtomicReference<>(); - awaitBusy(() -> { - try { - ClientYamlTestResponse response = callApi(apiName, params, bodies, getApiCallHeaders()); - if (response.getStatusCode() == HttpStatus.SC_OK) { - exceptionHolder.set(null); - return success.apply(response); - } - return false; - } catch (IOException e) { - exceptionHolder.set(e); - } - return false; - }); - - IOException exception = exceptionHolder.get(); - if (exception != null) { - throw new IllegalStateException(error.get(), exception); + Supplier error) { + try { + // The actual method call that sends the API requests returns a Future, but we immediately + // call .get() on it so there's no need for this method to do any other awaiting. + ClientYamlTestResponse response = callApi(apiName, params, bodies, getApiCallHeaders()); + assertEquals(response.getStatusCode(), HttpStatus.SC_OK); + success.apply(response); + } catch (Exception e) { + throw new IllegalStateException(error.get(), e); } } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index 528c49eb1dacf..fc082a6cf3949 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -195,7 +195,7 @@ public void setUpMocks() { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); } - public void testPageSizeAdapt() throws InterruptedException { + public void testPageSizeAdapt() throws Exception { Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000); TransformConfig config = new TransformConfig(randomAlphaOfLength(10), randomSourceConfig(), @@ -232,8 +232,9 @@ public void testPageSizeAdapt() throws InterruptedException { assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + latch.countDown(); - awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED))); long pageSizeAfterFirstReduction = indexer.getPageSize(); assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction)); assertThat(pageSizeAfterFirstReduction, greaterThan((long)TransformIndexer.MINIMUM_PAGE_SIZE)); @@ -245,8 +246,9 @@ public void testPageSizeAdapt() throws InterruptedException { assertThat(indexer.getState(), equalTo(IndexerState.STARTED)); assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis())); assertThat(indexer.getState(), equalTo(IndexerState.INDEXING)); + secondRunLatch.countDown(); - awaitBusy(() -> indexer.getState() == IndexerState.STOPPED); + assertBusy(() -> assertThat(indexer.getState(), equalTo(IndexerState.STARTED))); // assert that page size has been reduced again assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize())); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/activate/ActivateWatchTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/activate/ActivateWatchTests.java index 700cdf4e98b34..e25d1d2fb3f01 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/activate/ActivateWatchTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/transport/action/activate/ActivateWatchTests.java @@ -90,9 +90,10 @@ public void testDeactivateAndActivate() throws Exception { Thread.sleep(5000); refresh(); - long count2 = docCount(".watcher-history*", matchAllQuery()); - - assertThat(count2, is(count1)); + // Ensure no new watch history. The assertion ought to always return false, but if it returns true + // then we know that more history has been written. + boolean hasNewHistory = waitUntil(() -> count1 != docCount(".watcher-history*", matchAllQuery()), 5, TimeUnit.SECONDS); + assertFalse("Watcher should have stopped executing but new history found", hasNewHistory); // lets activate it again logger.info("Activating watch again"); diff --git a/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/SimpleKdcLdapServer.java b/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/SimpleKdcLdapServer.java index ec94af9b75fc4..3535dd3624614 100644 --- a/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/SimpleKdcLdapServer.java +++ b/x-pack/qa/evil-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/SimpleKdcLdapServer.java @@ -35,6 +35,9 @@ import javax.net.ServerSocketFactory; +import static org.elasticsearch.test.ESTestCase.assertBusy; +import static org.junit.Assert.assertTrue; + /** * Utility wrapper around Apache {@link SimpleKdcServer} backed by Unboundid * {@link InMemoryDirectoryServer}.
@@ -90,9 +93,7 @@ public Boolean run() throws Exception { AccessController.doPrivileged(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - if (ESTestCase.awaitBusy(() -> init()) == false) { - throw new IllegalStateException("could not initialize SimpleKdcLdapServer"); - } + assertBusy(() -> assertTrue("Failed to initialize SimpleKdcLdapServer", init())); return null; } }); @@ -218,7 +219,7 @@ public synchronized void createPrincipal(final Path keytabFile, final String... /** * Stop Simple Kdc Server - * + * * @throws PrivilegedActionException when privileged action threw exception */ public synchronized void stop() throws PrivilegedActionException { diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java index 64c3a785d14e4..7e88c327127a3 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractUpgradeTestCase.java @@ -6,15 +6,18 @@ package org.elasticsearch.upgrades; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.xpack.test.SecuritySettingsSourceField; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.test.SecuritySettingsSourceField; import org.junit.Before; -import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static org.elasticsearch.xpack.test.SecuritySettingsSourceField.basicAuthHeaderValue; @@ -77,27 +80,31 @@ protected Settings restClientSettings() { } protected Collection templatesToWaitFor() { - return Collections.singletonList("security-index-template"); + return Collections.emptyList(); } @Before public void setupForTests() throws Exception { - awaitBusy(() -> { - boolean success = true; - for (String template : templatesToWaitFor()) { - try { - final Request headRequest = new Request("HEAD", "_template/" + template); - headRequest.setOptions(allowTypesRemovalWarnings()); - final boolean exists = adminClient() - .performRequest(headRequest) - .getStatusLine().getStatusCode() == 200; - success &= exists; - logger.debug("template [{}] exists [{}]", template, exists); - } catch (IOException e) { - logger.warn("error calling template api", e); - } + final Collection expectedTemplates = templatesToWaitFor(); + + if (expectedTemplates.isEmpty()) { + return; + } + + assertBusy(() -> { + final Request catRequest = new Request("GET", "_cat/templates?h=n&s=n"); + final Response catResponse = adminClient().performRequest(catRequest); + + final List templates = Streams.readAllLines(catResponse.getEntity().getContent()); + + final List missingTemplates = expectedTemplates.stream() + .filter(each -> templates.contains(each) == false) + .collect(Collectors.toList()); + + // While it's possible to use a Hamcrest matcher for this, the failure is much less legible. + if (missingTemplates.isEmpty() == false) { + fail("Some expected templates are missing: " + missingTemplates + ". The templates that exist are: " + templates + ""); } - return success; }); } } diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java index 9aaef80f011c2..255edf32261e1 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/DataFrameSurvivesUpgradeIT.java @@ -37,6 +37,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -68,8 +69,12 @@ public class DataFrameSurvivesUpgradeIT extends AbstractUpgradeTestCase { @Override protected Collection templatesToWaitFor() { - return Stream.concat(XPackRestTestConstants.DATA_FRAME_TEMPLATES.stream(), - super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_4_0)) { + return Stream.concat(XPackRestTestConstants.DATA_FRAME_TEMPLATES.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } else { + return Collections.emptySet(); + } } protected static void waitForPendingDataFrameTasks() throws Exception { diff --git a/x-pack/qa/security-client-tests/src/test/java/org/elasticsearch/xpack/security/qa/SecurityTransportClientIT.java b/x-pack/qa/security-client-tests/src/test/java/org/elasticsearch/xpack/security/qa/SecurityTransportClientIT.java index 519f365d515a0..6892f640415c4 100644 --- a/x-pack/qa/security-client-tests/src/test/java/org/elasticsearch/xpack/security/qa/SecurityTransportClientIT.java +++ b/x-pack/qa/security-client-tests/src/test/java/org/elasticsearch/xpack/security/qa/SecurityTransportClientIT.java @@ -51,9 +51,7 @@ protected Collection> transportClientPlugins() { public void testThatTransportClientWithoutAuthenticationDoesNotWork() throws Exception { try (TransportClient client = transportClient(Settings.EMPTY)) { - boolean connected = awaitBusy(() -> { - return client.connectedNodes().size() > 0; - }, 5L, TimeUnit.SECONDS); + boolean connected = waitUntil(() -> client.connectedNodes().size() > 0, 5L, TimeUnit.SECONDS); assertThat(connected, is(false)); } @@ -64,11 +62,7 @@ public void testThatTransportClientAuthenticationWithTransportClientRole() throw .put(SecurityField.USER_SETTING.getKey(), TRANSPORT_USER_PW) .build(); try (TransportClient client = transportClient(settings)) { - boolean connected = awaitBusy(() -> { - return client.connectedNodes().size() > 0; - }, 5L, TimeUnit.SECONDS); - - assertThat(connected, is(true)); + assertBusy(() -> assertFalse(client.connectedNodes().isEmpty()), 5L, TimeUnit.SECONDS); // this checks that the transport client is really running in a limited state try { @@ -86,11 +80,7 @@ public void testTransportClientWithAdminUser() throws Exception { .put(SecurityField.USER_SETTING.getKey(), useTransportUser ? TRANSPORT_USER_PW : ADMIN_USER_PW) .build(); try (TransportClient client = transportClient(settings)) { - boolean connected = awaitBusy(() -> { - return client.connectedNodes().size() > 0; - }, 5L, TimeUnit.SECONDS); - - assertThat(connected, is(true)); + assertBusy(() -> assertFalse(client.connectedNodes().isEmpty()), 5L, TimeUnit.SECONDS); // this checks that the transport client is really running in a limited state ClusterHealthResponse response; diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java index d95a08b467adc..d09d09bbc40fe 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java @@ -23,25 +23,23 @@ public final class XPackRestTestConstants { // ML constants: public static final String ML_META_INDEX_NAME = ".ml-meta"; - public static final String AUDITOR_NOTIFICATIONS_INDEX = ".ml-notifications-000001"; public static final String CONFIG_INDEX = ".ml-config"; public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-"; public static final String STATE_INDEX_PREFIX = ".ml-state"; public static final String RESULTS_INDEX_DEFAULT = "shared"; public static final List ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(Arrays.asList( - AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX)); + ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX)); public static final List ML_POST_V660_TEMPLATES = Collections.unmodifiableList(Arrays.asList( - AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX, CONFIG_INDEX)); // Data Frame constants: - public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-1"; + public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-2"; public static final String DATA_FRAME_NOTIFICATIONS_INDEX = ".data-frame-notifications-1"; public static final List DATA_FRAME_TEMPLATES = diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java index 6ad16d512ef0b..f36b0cb7f379e 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestHelper.java @@ -5,22 +5,25 @@ */ package org.elasticsearch.xpack.test.rest; - import org.apache.http.util.EntityUtils; import org.elasticsearch.Version; import org.elasticsearch.client.Request; -import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.rest.ESRestTestCase.allowTypesRemovalWarnings; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.fail; public final class XPackRestTestHelper { @@ -31,50 +34,53 @@ private XPackRestTestHelper() { * For each template name wait for the template to be created and * for the template version to be equal to the master node version. * - * @param client The rest client - * @param templateNames Names of the templates to wait for + * @param client The rest client + * @param expectedTemplates Names of the templates to wait for * @throws InterruptedException If the wait is interrupted */ - public static void waitForTemplates(RestClient client, List templateNames) throws InterruptedException { + public static void waitForTemplates(RestClient client, List expectedTemplates) throws Exception { AtomicReference masterNodeVersion = new AtomicReference<>(); - ESTestCase.awaitBusy(() -> { - String response; - try { - Request request = new Request("GET", "/_cat/nodes"); - request.addParameter("h", "master,version"); - response = EntityUtils.toString(client.performRequest(request).getEntity()); - } catch (IOException e) { - throw new RuntimeException(e); - } + + assertBusy(() -> { + Request request = new Request("GET", "/_cat/nodes"); + request.addParameter("h", "master,version"); + String response = EntityUtils.toString(client.performRequest(request).getEntity()); + for (String line : response.split("\n")) { if (line.startsWith("*")) { masterNodeVersion.set(Version.fromString(line.substring(2).trim())); - return true; + return; } } - return false; + fail("No master elected"); }); - for (String template : templateNames) { - ESTestCase.awaitBusy(() -> { - Map response; - try { - final Request getRequest = new Request("GET", "_template/" + template); - getRequest.setOptions(allowTypesRemovalWarnings()); - String string = EntityUtils.toString(client.performRequest(getRequest).getEntity()); - response = XContentHelper.convertToMap(JsonXContent.jsonXContent, string, false); - } catch (ResponseException e) { - if (e.getResponse().getStatusLine().getStatusCode() == 404) { - return false; - } - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } + assertBusy(() -> { + final Request request = new Request("GET", "_template"); + request.setOptions(allowTypesRemovalWarnings()); + + String string = EntityUtils.toString(client.performRequest(request).getEntity()); + Map response = XContentHelper.convertToMap(JsonXContent.jsonXContent, string, false); + + final Set templates = new TreeSet<>(response.keySet()); + + final List missingTemplates = expectedTemplates.stream() + .filter(each -> templates.contains(each) == false) + .collect(Collectors.toList()); + + // While it's possible to use a Hamcrest matcher for this, the failure is much less legible. + if (missingTemplates.isEmpty() == false) { + fail("Some expected templates are missing: " + missingTemplates + ". The templates that exist are: " + templates + ""); + } + + expectedTemplates.forEach(template -> { Map templateDefinition = (Map) response.get(template); - return Version.fromId((Integer) templateDefinition.get("version")).equals(masterNodeVersion.get()); + assertThat( + "Template [" + template + "] has unexpected version", + Version.fromId((Integer) templateDefinition.get("version")), + equalTo(masterNodeVersion.get())); }); - } + }); } public static String resultsWriteAlias(String jobId) {