Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,11 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task)
if (shouldBlock) {
// Simulate a job that takes forever to finish
// Using periodic checks method to identify that the task was cancelled
try {
waitUntil(() -> {
((CancellableTask) task).ensureNotCancelled();
return false;
});
fail("It should have thrown an exception");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
waitUntil(() -> {
((CancellableTask) task).ensureNotCancelled();
return false;
});
fail("It should have thrown an exception");
}
debugDelay("op4");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,12 @@ protected void doExecute(Task task, NodesRequest request, ActionListener<NodesRe
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
logger.info("Test task started on the node {}", clusterService.localNode());
if (request.shouldBlock) {
try {
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled!");
}
return ((TestTask) task).isBlocked() == false;
});
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
waitUntil(() -> {
if (((CancellableTask) task).isCancelled()) {
throw new RuntimeException("Cancelled!");
}
return ((TestTask) task).isBlocked() == false;
});
}
logger.info("Test task finished on the node {}", clusterService.localNode());
return new NodeResponse(clusterService.localNode());
Expand All @@ -301,9 +297,7 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) {

public static class UnblockTestTaskResponse implements Writeable {

UnblockTestTaskResponse() {

}
UnblockTestTaskResponse() {}

UnblockTestTaskResponse(StreamInput in) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,23 +1192,19 @@ public static List<PersistentTasksCustomMetadata.PersistentTask<?>> findTasks(Cl
@Nullable
public static DiscoveryNode waitAndGetHealthNode(InternalTestCluster internalCluster) {
DiscoveryNode[] healthNode = new DiscoveryNode[1];
try {
waitUntil(() -> {
ClusterState state = internalCluster.client()
.admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setNodes(true)
.get()
.getState();
healthNode[0] = HealthNode.findHealthNode(state);
return healthNode[0] != null;
}, 15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
waitUntil(() -> {
ClusterState state = internalCluster.client()
.admin()
.cluster()
.prepareState()
.clear()
.setMetadata(true)
.setNodes(true)
.get()
.getState();
healthNode[0] = HealthNode.findHealthNode(state);
return healthNode[0] != null;
}, 15, TimeUnit.SECONDS);
return healthNode[0];
}

Expand Down Expand Up @@ -1640,7 +1636,7 @@ protected static IndicesAdminClient indicesAdmin() {
return admin().indices();
}

public void indexRandom(boolean forceRefresh, String index, int numDocs) throws InterruptedException {
public void indexRandom(boolean forceRefresh, String index, int numDocs) {
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = prepareIndex(index).setSource("field", "value");
Expand All @@ -1651,11 +1647,11 @@ public void indexRandom(boolean forceRefresh, String index, int numDocs) throws
/**
* Convenience method that forwards to {@link #indexRandom(boolean, List)}.
*/
public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) throws InterruptedException {
public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) {
indexRandom(forceRefresh, Arrays.asList(builders));
}

public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexRequestBuilder... builders) throws InterruptedException {
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexRequestBuilder... builders) {
indexRandom(forceRefresh, dummyDocuments, Arrays.asList(builders));
}

Expand All @@ -1674,7 +1670,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexReque
* @param builders the documents to index.
* @see #indexRandom(boolean, boolean, java.util.List)
*/
public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders) throws InterruptedException {
public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders) {
indexRandom(forceRefresh, forceRefresh, builders);
}

Expand All @@ -1690,7 +1686,7 @@ public void indexRandom(boolean forceRefresh, List<IndexRequestBuilder> builders
* all documents are indexed. This is useful to produce deleted documents on the server side.
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) throws InterruptedException {
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<IndexRequestBuilder> builders) {
indexRandom(forceRefresh, dummyDocuments, true, builders);
}

Expand All @@ -1707,8 +1703,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List<Index
* @param maybeFlush if {@code true} this method may randomly execute full flushes after index operations.
* @param builders the documents to index.
*/
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List<IndexRequestBuilder> builders)
throws InterruptedException {
public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List<IndexRequestBuilder> builders) {
Random random = random();
Set<String> indices = new HashSet<>();
builders = new ArrayList<>(builders);
Expand Down Expand Up @@ -1822,8 +1817,7 @@ private static CountDownLatch newLatch(List<CountDownLatch> latches) {
/**
* Maybe refresh, force merge, or flush then always make sure there aren't too many in flight async operations.
*/
private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations, boolean maybeFlush)
throws InterruptedException {
private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlightAsyncOperations, boolean maybeFlush) {
if (rarely()) {
if (rarely()) {
indicesAdmin().prepareRefresh(indices)
Expand All @@ -1843,7 +1837,7 @@ private void postIndexAsyncActions(String[] indices, List<CountDownLatch> inFlig
}
while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) {
int waitFor = between(0, inFlightAsyncOperations.size() - 1);
inFlightAsyncOperations.remove(waitFor).await();
safeAwait(inFlightAsyncOperations.remove(waitFor));
}
}

Expand Down
41 changes: 25 additions & 16 deletions test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.startsWith;

/**
Expand Down Expand Up @@ -1420,9 +1421,8 @@ public static void assertBusy(CheckedRunnable<Exception> codeBlock, long maxWait
*
* @param breakSupplier determines whether to return immediately or continue waiting.
* @return the last value returned by <code>breakSupplier</code>
* @throws InterruptedException if any sleep calls were interrupted.
*/
public static boolean waitUntil(BooleanSupplier breakSupplier) throws InterruptedException {
public static boolean waitUntil(BooleanSupplier breakSupplier) {
return waitUntil(breakSupplier, 10, TimeUnit.SECONDS);
}

Expand All @@ -1438,22 +1438,21 @@ public static boolean waitUntil(BooleanSupplier breakSupplier) throws Interrupte
* @param maxWaitTime the maximum amount of time to wait
* @param unit the unit of tie for <code>maxWaitTime</code>
* @return the last value returned by <code>breakSupplier</code>
* @throws InterruptedException if any sleep calls were interrupted.
*/
public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) throws InterruptedException {
public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) {
long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit);
long timeInMillis = 1;
long sum = 0;
while (sum + timeInMillis < maxTimeInMillis) {
if (breakSupplier.getAsBoolean()) {
return true;
}
Thread.sleep(timeInMillis);
safeSleep(timeInMillis);
sum += timeInMillis;
timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2);
}
timeInMillis = maxTimeInMillis - sum;
Thread.sleep(Math.max(timeInMillis, 0));
safeSleep(Math.max(timeInMillis, 0));
return breakSupplier.getAsBoolean();
}

Expand Down Expand Up @@ -2505,7 +2504,7 @@ public static <T extends Throwable> T expectThrows(Class<T> expectedType, Reques
* Same as {@link #runInParallel(int, IntConsumer)} but also attempts to start all tasks at the same time by blocking execution on a
* barrier until all threads are started and ready to execute their task.
*/
public static void startInParallel(int numberOfTasks, IntConsumer taskFactory) throws InterruptedException {
public static void startInParallel(int numberOfTasks, IntConsumer taskFactory) {
final CyclicBarrier barrier = new CyclicBarrier(numberOfTasks);
runInParallel(numberOfTasks, i -> {
safeAwait(barrier);
Expand All @@ -2519,7 +2518,7 @@ public static void startInParallel(int numberOfTasks, IntConsumer taskFactory) t
* @param numberOfTasks number of tasks to run in parallel
* @param taskFactory task factory
*/
public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) throws InterruptedException {
public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) {
final ArrayList<Future<?>> futures = new ArrayList<>(numberOfTasks);
final Thread[] threads = new Thread[numberOfTasks - 1];
for (int i = 0; i < numberOfTasks; i++) {
Expand All @@ -2534,16 +2533,26 @@ public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) thr
threads[i].start();
}
}
for (Thread thread : threads) {
thread.join();
}
Exception e = null;
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception ex) {
e = ExceptionsHelper.useOrSuppress(e, ex);
try {
for (Thread thread : threads) {
// no sense in waiting for the rest of the threads, nor any futures, if interrupted, just bail out and fail
thread.join();
}
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException interruptedException) {
// no sense in waiting for the rest of the futures if interrupted, just bail out and fail
Thread.currentThread().interrupt();
throw interruptedException;
} catch (Exception executionException) {
e = ExceptionsHelper.useOrSuppress(e, executionException);
}
}
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
e = ExceptionsHelper.useOrSuppress(e, interruptedException);
}
if (e != null) {
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1744,11 +1744,7 @@ private synchronized void startAndPublishNodesAndClients(List<NodeAndClient> nod
.filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters
.count();
rebuildUnicastHostFiles(nodeAndClients); // ensure that new nodes can find the existing nodes when they start
try {
runInParallel(nodeAndClients.size(), i -> nodeAndClients.get(i).startNode());
} catch (InterruptedException e) {
throw new AssertionError("interrupted while starting nodes", e);
}
runInParallel(nodeAndClients.size(), i -> nodeAndClients.get(i).startNode());
nodeAndClients.forEach(this::publishNode);

if (autoManageMasterNodes && newMasters > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,8 @@ public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exceptio
for (int i = between(10, 10_000); i >= 0; i--) {
indexRequestBuilders.add(prepareIndex(indexName).setSource("foo", randomBoolean() ? "bar" : "baz"));
}
try {
safeAwait(cyclicBarrier);
indexRandom(true, true, indexRequestBuilders);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
safeAwait(cyclicBarrier);
indexRandom(true, true, indexRequestBuilders);
refresh(indexName);
assertThat(
indicesAdmin().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,59 +401,52 @@ private PortBlockingRunnable(
public void run() {
final List<Socket> openedSockets = new ArrayList<>();
final List<InetAddress> failedAddresses = new ArrayList<>();
try {
final boolean allSocketsOpened = waitUntil(() -> {
try {
final InetAddress[] allAddresses;
if (serverAddress instanceof Inet4Address) {
allAddresses = NetworkUtils.getAllIPV4Addresses();
} else {
allAddresses = NetworkUtils.getAllIPV6Addresses();
}
final List<InetAddress> inetAddressesToBind = Arrays.stream(allAddresses)
.filter(addr -> openedSockets.stream().noneMatch(s -> addr.equals(s.getLocalAddress())))
.filter(addr -> failedAddresses.contains(addr) == false)
.collect(Collectors.toList());
for (InetAddress localAddress : inetAddressesToBind) {
try {
final Socket socket = openMockSocket(serverAddress, serverPort, localAddress, portToBind);
openedSockets.add(socket);
logger.debug("opened socket [{}]", socket);
} catch (NoRouteToHostException | ConnectException e) {
logger.debug(() -> "marking address [" + localAddress + "] as failed due to:", e);
failedAddresses.add(localAddress);
}
}
if (openedSockets.size() == 0) {
logger.debug("Could not open any sockets from the available addresses");
return false;

final boolean allSocketsOpened = waitUntil(() -> {
try {
final InetAddress[] allAddresses;
if (serverAddress instanceof Inet4Address) {
allAddresses = NetworkUtils.getAllIPV4Addresses();
} else {
allAddresses = NetworkUtils.getAllIPV6Addresses();
}
final List<InetAddress> inetAddressesToBind = Arrays.stream(allAddresses)
.filter(addr -> openedSockets.stream().noneMatch(s -> addr.equals(s.getLocalAddress())))
.filter(addr -> failedAddresses.contains(addr) == false)
.collect(Collectors.toList());
for (InetAddress localAddress : inetAddressesToBind) {
try {
final Socket socket = openMockSocket(serverAddress, serverPort, localAddress, portToBind);
openedSockets.add(socket);
logger.debug("opened socket [{}]", socket);
} catch (NoRouteToHostException | ConnectException e) {
logger.debug(() -> "marking address [" + localAddress + "] as failed due to:", e);
failedAddresses.add(localAddress);
}
return true;
} catch (IOException e) {
logger.debug(() -> "caught exception while opening socket on [" + portToBind + "]", e);
}
if (openedSockets.size() == 0) {
logger.debug("Could not open any sockets from the available addresses");
return false;
}
});

if (allSocketsOpened) {
latch.countDown();
} else {
success.set(false);
IOUtils.closeWhileHandlingException(openedSockets);
openedSockets.clear();
latch.countDown();
return;
return true;
} catch (IOException e) {
logger.debug(() -> "caught exception while opening socket on [" + portToBind + "]", e);
return false;
}
} catch (InterruptedException e) {
logger.debug(() -> "interrupted while trying to open sockets on [" + portToBind + "]", e);
Thread.currentThread().interrupt();
});

if (allSocketsOpened) {
latch.countDown();
} else {
success.set(false);
IOUtils.closeWhileHandlingException(openedSockets);
openedSockets.clear();
latch.countDown();
return;
}

try {
closeLatch.await();
} catch (InterruptedException e) {
logger.debug("caught exception while waiting for close latch", e);
Thread.currentThread().interrupt();
safeAwait(closeLatch);
} finally {
logger.debug("closing sockets on [{}]", portToBind);
IOUtils.closeWhileHandlingException(openedSockets);
Expand Down