Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ private String createExpiredData(String jobId) throws Exception {
// b) is slightly more efficient since we may not need to wait an entire second for the timestamp to increment
assertBusy(() -> {
long timeNow = System.currentTimeMillis() / 1000;
assertFalse(prevJobTimeStamp >= timeNow);
assertThat(prevJobTimeStamp, lessThan(timeNow));
});

// Update snapshot timestamp to force it out of snapshot retention window
Expand All @@ -920,7 +920,8 @@ private String createExpiredData(String jobId) throws Exception {
waitForForecastToComplete(jobId, forecastJobResponse.getForecastId());

// Wait for the forecast to expire
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// FIXME: We should wait for something specific to change, rather than waiting for time to pass.
waitUntil(() -> false, 1, TimeUnit.SECONDS);

// Run up to now
startDatafeed(datafeedId, String.valueOf(0), String.valueOf(nowMillis));
Expand Down Expand Up @@ -964,7 +965,9 @@ public void testDeleteExpiredData() throws Exception {

assertTrue(response.getDeleted());

awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// Wait for the forecast to expire
// FIXME: We should wait for something specific to change, rather than waiting for time to pass.
waitUntil(() -> false, 1, TimeUnit.SECONDS);

GetModelSnapshotsRequest getModelSnapshotsRequest1 = new GetModelSnapshotsRequest(jobId);
GetModelSnapshotsResponse getModelSnapshotsResponse1 = execute(getModelSnapshotsRequest1, machineLearningClient::getModelSnapshots,
Expand Down Expand Up @@ -2079,8 +2082,6 @@ private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws
highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT);
}



private String createAndPutDatafeed(String jobId, String indexName) throws IOException {
String datafeedId = jobId + "-feed";
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void testUpdateByQuery() {
}
}

public void testTasks() throws InterruptedException {
public void testTasks() throws Exception {
final Client client = client();
final ReindexRequestBuilder builder = reindexAndPartiallyBlock();

Expand Down Expand Up @@ -284,7 +284,7 @@ public void onFailure(Exception e) {
* Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state
* @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher)
*/
private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedException {
private ReindexRequestBuilder reindexAndPartiallyBlock() throws Exception {
final Client client = client();
final int numDocs = randomIntBetween(10, 100);
ALLOWED_OPERATIONS.release(numDocs);
Expand All @@ -310,9 +310,12 @@ private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedExcep
builder.execute();

// 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
1, TimeUnit.MINUTES));
assertBusy(
() -> {
assertTrue("Expected some queued threads", ALLOWED_OPERATIONS.hasQueuedThreads());
assertEquals("Expected that no permits are available", 0, ALLOWED_OPERATIONS.availablePermits());
},
1, TimeUnit.MINUTES);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ private void testCancel(String action, AbstractBulkByScrollRequestBuilder<?, ?>
* exhausted their slice while others might have quite a bit left
* to work on. We can't control that. */
logger.debug("waiting for updates to be blocked");
boolean blocked = awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
assertBusy(
() -> assertTrue("updates blocked", ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0),
1, TimeUnit.MINUTES); // 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", blocked);

// Status should show the task running
TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
LogManager.getLogger(SearchRestCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
awaitBusy(() -> shouldBlock.get() == false);
waitUntil(() -> shouldBlock.get() == false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task)
// Simulate a job that takes forever to finish
// Using periodic checks method to identify that the task was cancelled
try {
awaitBusy(() -> {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new TaskCancelledException("Cancelled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import java.util.Objects;

import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.waitUntil;

/**
* A plugin that adds a cancellable blocking test task of integration testing of the task manager.
Expand Down Expand Up @@ -305,7 +305,7 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {
logger.info("Test task started on the node {}", clusterService.localNode());
if (request.shouldBlock) {
try {
awaitBusy(() -> {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ public void testTwoNodesNoMasterBlock() throws Exception {
Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNode));

awaitBusy(() -> {
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID);
assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});

state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
// verify that both nodes are still in the cluster state but there is no master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,10 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception {

final Client clientToMasterlessNode = client();

assertTrue(awaitBusy(() -> {
ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState();
return state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID);
}
));
assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});

GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "type1", "1").get();
assertExists(getResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope= ESIntegTestCase.Scope.TEST, numDataNodes =0, minNumDataNodes = 2)
Expand Down Expand Up @@ -78,40 +80,43 @@ public void testSimpleAwareness() throws Exception {
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_2").build());

// On slow machines the initial relocation might be delayed
assertThat(awaitBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();
if (clusterHealth.isTimedOut()) {
return false;
}

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// check that closed indices are effectively closed
if (indicesToClose.stream().anyMatch(index -> clusterState.metaData().index(index).getState() != State.CLOSE)) {
return false;
}
// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
assertBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();

assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false));

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

// check that closed indices are effectively closed
final List<String> notClosedIndices =
indicesToClose.stream()
.filter(index -> clusterState.metaData().index(index).getState() != State.CLOSE)
.collect(Collectors.toList());
assertThat("Some indices not closed", notClosedIndices, empty());

// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
}
return counts.get(node3) == totalPrimaries;
},
10,
TimeUnit.SECONDS
), equalTo(true));
}
assertThat(counts.get(node3), equalTo(totalPrimaries));
},
10,
TimeUnit.SECONDS
);
}

public void testAwarenessZones() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
block1.countDown();
invoked2.await();

// whenever we test for no tasks, we need to awaitBusy since this is a live node
assertTrue(awaitBusy(() -> clusterService.getMasterService().pendingTasks().isEmpty()));
// whenever we test for no tasks, we need to wait since this is a live node
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty()));
waitNoPendingTasksOnAll();

final CountDownLatch block2 = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,21 +282,17 @@ public void testMappingTimeout() throws Exception {

}

private void assertDiscoveryCompleted(List<String> nodes) throws InterruptedException {
private void assertDiscoveryCompleted(List<String> nodes) throws Exception {
for (final String node : nodes) {
assertTrue(
"node [" + node + "] is still joining master",
awaitBusy(
() -> {
final Discovery discovery = internalCluster().getInstance(Discovery.class, node);
if (discovery instanceof ZenDiscovery) {
return !((ZenDiscovery) discovery).joiningCluster();
}
return true;
},
30,
TimeUnit.SECONDS
)
assertBusy(
() -> {
final Discovery discovery = internalCluster().getInstance(Discovery.class, node);
if (discovery instanceof ZenDiscovery) {
assertFalse("node [" + node + "] is still joining master", ((ZenDiscovery) discovery).joiningCluster());
}
},
30,
TimeUnit.SECONDS
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void run() {
entry = simulations.get(nodeId);
if (entry == null) {
// we are simulating a master node switch, wait for it to not be null
awaitBusy(() -> simulations.containsKey(nodeId));
assertBusy(() -> assertTrue(simulations.containsKey(nodeId)));
}
assert entry != null;
entry.executeLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public void testQuorumRecovery() throws Exception {
@Override
public void doAfterNodes(int numNodes, final Client activeClient) throws Exception {
if (numNodes == 1) {
assertTrue(awaitBusy(() -> {
assertBusy(() -> {
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest()
.waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW;
}, 30, TimeUnit.SECONDS));
assertFalse(clusterHealth.isTimedOut());
assertEquals(ClusterHealthStatus.YELLOW, clusterHealth.getStatus());
}, 30, TimeUnit.SECONDS);

logger.info("--> one node is closed -- index 1 document into the remaining nodes");
activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3")
.endObject()).get();
Expand Down
Loading