Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
6f80ece
Remove awaitBusy in favor of assertBusy (#28450)
liketic Feb 7, 2018
af90601
Merge branch 'master' into fix-issues/28450
liketic Mar 2, 2018
86e25e8
Fix comments
liketic Mar 2, 2018
4c97324
Remove wait for relocation
liketic Mar 6, 2018
2a1ae3d
Merge branch 'master' into fix-issues/28450
liketic Mar 7, 2018
d98ba94
Merge branch 'master' into fix-issues/28450
javanna Apr 30, 2018
14f1231
Remove awaitBusy in xpack
liketic May 2, 2018
ef37820
Merge branch 'master' into fix-issues/28450
javanna May 4, 2018
5c117ba
Merge branch 'master' of github.com:elastic/elasticsearch into fix-is…
liketic May 5, 2018
5eaf479
Remove awaitBusy
liketic May 5, 2018
6d1aef7
Resolve conflicts
liketic Jun 22, 2018
e82ca0a
Remove redundant import
liketic Jun 22, 2018
f778984
Merge branch 'master' into fix-issues/28450
Oct 17, 2018
20f0408
Changing a few more cases
Oct 17, 2018
958fc33
Merge branch 'master' into fix-issues/28450
Oct 18, 2018
5489df9
Changing one more awaitsBusy
Oct 18, 2018
2ff0772
Merge branch 'master' of github.com:elastic/elasticsearch into fix-is…
liketic Oct 20, 2018
7a09e12
Merge master
liketic Oct 27, 2018
48cdb34
Remove unused import
liketic Oct 27, 2018
4ce3982
Merge master
liketic Dec 21, 2018
90cdd97
Replace awaitBusy with assertBusy
liketic Dec 21, 2018
78cd0e5
remove unused imports
liketic Dec 22, 2018
668870d
Ensure empty wait not throw error
liketic Dec 22, 2018
e920c1d
Merge remote-tracking branch 'liketic/fix-issues/28450' into 28450-re…
pugnascotia Aug 22, 2019
098bebf
Replace further uses of awaitBusy with assertBusy
pugnascotia Aug 22, 2019
de7e4fd
Rework waitForDocs to scale its timeout
pugnascotia Aug 23, 2019
25d5037
Sync waitForDocs implementations
pugnascotia Aug 26, 2019
1a09f9b
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Aug 26, 2019
bb44273
Simplify waitForDocs further
pugnascotia Aug 28, 2019
c6209b7
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Aug 28, 2019
96b5539
Fix checkstyle
pugnascotia Aug 28, 2019
0db95a4
Fixes
pugnascotia Aug 28, 2019
82ca600
Fix checkstyle again
pugnascotia Aug 28, 2019
3e417f4
checkstyle checkstyle checkstyle
pugnascotia Aug 28, 2019
9724bde
checkstyle
pugnascotia Aug 28, 2019
a493c18
Fix bug in DataFrameIndexerTests
pugnascotia Aug 29, 2019
7a87efd
Test fix
pugnascotia Aug 29, 2019
ef9b64c
Silence broken test
pugnascotia Sep 3, 2019
4a67253
Fix ActivateWatchTests
pugnascotia Sep 3, 2019
513c7ed
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Sep 4, 2019
2187a87
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Sep 12, 2019
9133d2c
Merge branch 'master' into 28450-remove-await-busy
elasticmachine Sep 13, 2019
b3067f5
Merge branch 'master' into 28450-remove-await-busy
elasticmachine Sep 17, 2019
4990279
Correct a template name
pugnascotia Sep 18, 2019
7c74266
Correct the expect templates in AbstractUpgradeTestCase
pugnascotia Sep 18, 2019
bafec6d
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Sep 18, 2019
3cf7b66
Remove an expected ML template from XPackRestTestConstants
pugnascotia Sep 19, 2019
44e36d9
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Sep 19, 2019
492946a
Merge remote-tracking branch 'upstream/master' into 28450-remove-awai…
pugnascotia Sep 19, 2019
b620ca4
Merge branch 'master' into 28450-remove-await-busy
elasticmachine Sep 23, 2019
c616331
Merge branch 'master' into 28450-remove-await-busy
elasticmachine Sep 25, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ private String createExpiredData(String jobId) throws Exception {
// b) is slightly more efficient since we may not need to wait an entire second for the timestamp to increment
assertBusy(() -> {
long timeNow = System.currentTimeMillis() / 1000;
assertFalse(prevJobTimeStamp >= timeNow);
assertThat(prevJobTimeStamp, lessThan(timeNow));
});

// Update snapshot timestamp to force it out of snapshot retention window
Expand All @@ -890,7 +890,8 @@ private String createExpiredData(String jobId) throws Exception {
waitForForecastToComplete(jobId, forecastJobResponse.getForecastId());

// Wait for the forecast to expire
awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// FIXME: We should wait for something specific to change, rather than waiting for time to pass.
waitUntil(() -> false, 1, TimeUnit.SECONDS);

// Run up to now
startDatafeed(datafeedId, String.valueOf(0), String.valueOf(nowMillis));
Expand Down Expand Up @@ -934,7 +935,9 @@ public void testDeleteExpiredData() throws Exception {

assertTrue(response.getDeleted());

awaitBusy(() -> false, 1, TimeUnit.SECONDS);
// Wait for the forecast to expire
// FIXME: We should wait for something specific to change, rather than waiting for time to pass.
waitUntil(() -> false, 1, TimeUnit.SECONDS);

GetModelSnapshotsRequest getModelSnapshotsRequest1 = new GetModelSnapshotsRequest(jobId);
GetModelSnapshotsResponse getModelSnapshotsResponse1 = execute(getModelSnapshotsRequest1, machineLearningClient::getModelSnapshots,
Expand Down Expand Up @@ -2049,8 +2052,6 @@ private void updateModelSnapshotTimestamp(String jobId, String timestamp) throws
highLevelClient().update(updateSnapshotRequest, RequestOptions.DEFAULT);
}



private String createAndPutDatafeed(String jobId, String indexName) throws IOException {
String datafeedId = jobId + "-feed";
DatafeedConfig datafeed = DatafeedConfig.builder(datafeedId, jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void testUpdateByQuery() {
}
}

public void testTasks() throws InterruptedException {
public void testTasks() throws Exception {
final Client client = client();
final ReindexRequestBuilder builder = reindexAndPartiallyBlock();

Expand Down Expand Up @@ -279,7 +279,7 @@ public void onFailure(Exception e) {
* Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state
* @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher)
*/
private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedException {
private ReindexRequestBuilder reindexAndPartiallyBlock() throws Exception {
final Client client = client();
final int numDocs = randomIntBetween(10, 100);
ALLOWED_OPERATIONS.release(numDocs);
Expand All @@ -305,9 +305,12 @@ private ReindexRequestBuilder reindexAndPartiallyBlock() throws InterruptedExcep
builder.execute();

// 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
1, TimeUnit.MINUTES));
assertBusy(
() -> {
assertTrue("Expected some queued threads", ALLOWED_OPERATIONS.hasQueuedThreads());
assertEquals("Expected that no permits are available", 0, ALLOWED_OPERATIONS.availablePermits());
},
1, TimeUnit.MINUTES);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,9 @@ private void testCancel(String action, AbstractBulkByScrollRequestBuilder<?, ?>
* exhausted their slice while others might have quite a bit left
* to work on. We can't control that. */
logger.debug("waiting for updates to be blocked");
boolean blocked = awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
assertBusy(
() -> assertTrue("updates blocked", ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0),
1, TimeUnit.MINUTES); // 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", blocked);

// Status should show the task running
TaskInfo mainTask = findTaskToCancel(action, builder.request().getSlices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
LogManager.getLogger(SearchRestCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
awaitBusy(() -> shouldBlock.get() == false);
waitUntil(() -> shouldBlock.get() == false);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task)
// Simulate a job that takes forever to finish
// Using periodic checks method to identify that the task was cancelled
try {
awaitBusy(() -> {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new TaskCancelledException("Cancelled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import java.util.Objects;

import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.test.ESTestCase.awaitBusy;
import static org.elasticsearch.test.ESTestCase.waitUntil;

/**
* A plugin that adds a cancellable blocking test task of integration testing of the task manager.
Expand Down Expand Up @@ -305,7 +305,7 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {
logger.info("Test task started on the node {}", clusterService.localNode());
if (request.shouldBlock) {
try {
awaitBusy(() -> {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,11 @@ public void testTwoNodesNoMasterBlock() throws Exception {
Settings masterDataPathSettings = internalCluster().dataPathSettings(masterNode);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNode));

awaitBusy(() -> {
assertBusy(() -> {
ClusterState clusterState = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
return clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID);
assertTrue(clusterState.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});

state = client().admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID), equalTo(true));
// verify that both nodes are still in the cluster state but there is no master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,10 @@ public void testNoMasterActionsWriteMasterBlock() throws Exception {

final Client clientToMasterlessNode = client();

assertTrue(awaitBusy(() -> {
ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState();
return state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID);
}
));
assertBusy(() -> {
ClusterState state = clientToMasterlessNode.admin().cluster().prepareState().setLocal(true).get().getState();
assertTrue(state.blocks().hasGlobalBlockWithId(NoMasterBlockService.NO_MASTER_BLOCK_ID));
});

GetResponse getResponse = clientToMasterlessNode.prepareGet("test1", "1").get();
assertExists(getResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

@ClusterScope(scope= ESIntegTestCase.Scope.TEST, numDataNodes =0, minNumDataNodes = 2)
Expand Down Expand Up @@ -78,40 +80,43 @@ public void testSimpleAwareness() throws Exception {
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_2").build());

// On slow machines the initial relocation might be delayed
assertThat(awaitBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();
if (clusterHealth.isTimedOut()) {
return false;
}

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// check that closed indices are effectively closed
if (indicesToClose.stream().anyMatch(index -> clusterState.metaData().index(index).getState() != State.CLOSE)) {
return false;
}
// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
assertBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();

assertThat("Cluster health request timed out", clusterHealth.isTimedOut(), equalTo(false));

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();

// check that closed indices are effectively closed
final List<String> notClosedIndices =
indicesToClose.stream()
.filter(index -> clusterState.metaData().index(index).getState() != State.CLOSE)
.collect(Collectors.toList());
assertThat("Some indices not closed", notClosedIndices, empty());

// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
counts.addTo(clusterState.nodes().get(shardRouting.currentNodeId()).getName(), 1);
}
}
return counts.get(node3) == totalPrimaries;
},
10,
TimeUnit.SECONDS
), equalTo(true));
}
assertThat(counts.get(node3), equalTo(totalPrimaries));
},
10,
TimeUnit.SECONDS
);
}

public void testAwarenessZones() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
block1.countDown();
invoked2.await();

// whenever we test for no tasks, we need to awaitBusy since this is a live node
assertTrue(awaitBusy(() -> clusterService.getMasterService().pendingTasks().isEmpty()));
// whenever we test for no tasks, we need to wait since this is a live node
assertBusy(() -> assertTrue("Pending tasks not empty", clusterService.getMasterService().pendingTasks().isEmpty()));
waitNoPendingTasksOnAll();

final CountDownLatch block2 = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void run() {
entry = simulations.get(nodeId);
if (entry == null) {
// we are simulating a master node switch, wait for it to not be null
awaitBusy(() -> simulations.containsKey(nodeId));
assertBusy(() -> assertTrue(simulations.containsKey(nodeId)));
}
assert entry != null;
entry.executeLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public void testQuorumRecovery() throws Exception {
@Override
public void doAfterNodes(int numNodes, final Client activeClient) throws Exception {
if (numNodes == 1) {
assertTrue(awaitBusy(() -> {
assertBusy(() -> {
logger.info("--> running cluster_health (wait for the shards to startup)");
ClusterHealthResponse clusterHealth = activeClient.admin().cluster().health(clusterHealthRequest()
.waitForYellowStatus().waitForNodes("2").waitForActiveShards(test.numPrimaries * 2)).actionGet();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
return (!clusterHealth.isTimedOut()) && clusterHealth.getStatus() == ClusterHealthStatus.YELLOW;
}, 30, TimeUnit.SECONDS));
assertFalse(clusterHealth.isTimedOut());
assertEquals(ClusterHealthStatus.YELLOW, clusterHealth.getStatus());
}, 30, TimeUnit.SECONDS);

logger.info("--> one node is closed -- index 1 document into the remaining nodes");
activeClient.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3")
.endObject()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static CompressedXContent filter(QueryBuilder filterBuilder) throws IOExc
return new CompressedXContent(Strings.toString(builder));
}

public void testBaseAsyncTask() throws InterruptedException, IOException {
public void testBaseAsyncTask() throws Exception {
IndexService indexService = createIndex("test", Settings.EMPTY);
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
AtomicReference<CountDownLatch> latch2 = new AtomicReference<>(new CountDownLatch(1));
Expand Down Expand Up @@ -126,7 +126,7 @@ protected void runInternal() {
// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
Expand All @@ -136,7 +136,7 @@ protected void runInternal() {

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);

Expand Down Expand Up @@ -204,7 +204,7 @@ public void testRefreshTaskIsUpdated() throws Exception {
// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
Expand All @@ -215,7 +215,7 @@ public void testRefreshTaskIsUpdated() throws Exception {

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
refreshTask = indexService.getRefreshTask();
Expand All @@ -241,7 +241,7 @@ public void testFsyncTaskIsRunning() throws Exception {
// now close the index
final Index index = indexService.index();
assertAcked(client().admin().indices().prepareClose(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));

final IndexService closedIndexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(indexService, closedIndexService);
Expand All @@ -252,7 +252,7 @@ public void testFsyncTaskIsRunning() throws Exception {

// now reopen the index
assertAcked(client().admin().indices().prepareOpen(index.getName()));
awaitBusy(() -> getInstanceFromNode(IndicesService.class).hasIndex(index));
assertBusy(() -> assertTrue("Index not found: " + index.getName(), getInstanceFromNode(IndicesService.class).hasIndex(index)));
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(index);
assertNotSame(closedIndexService, indexService);
fsyncTask = indexService.getFsyncTask();
Expand Down
Loading