Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,29 @@ public T call() throws Exception {
Future<T> future = executorService.submit(task);
LOG.info("Task {} execution started at {}", taskName, startTime);

Throwable lastError = null;
while (true) {
Throwable lastError;
try {
LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
LOG.info("Task {} successfully completed with result: {}", taskName, taskResult);
return taskResult;
} catch (TimeoutException e) {
LOG.debug("Task {} timeout", taskName);
lastError = e;
if (lastError == null) {
lastError = e;
}
timeLeft = 0;
} catch (ExecutionException e) {
Throwable cause = Throwables.getRootCause(e);
if (!(cause instanceof RetryTaskSilently)) {
LOG.info(String.format("Task %s exception during execution", taskName), cause);
}
lastError = cause;
timeLeft = timeout - (System.currentTimeMillis() - startTime);
timeLeft = timeout - (System.currentTimeMillis() - startTime) - retryDelay;
}

if (timeLeft < retryDelay) {
if (timeLeft <= 0) {
attemptToCancel(future);
LOG.warn("Task {} timeout exceeded, no more retries", taskName);
onError.accept(lastError);
Expand All @@ -124,6 +126,12 @@ private void attemptToCancel(Future<?> future) {

public static class RetryTaskSilently extends RuntimeException {
// marker, throw if the task needs to be retried
public RetryTaskSilently() {
super();
}
public RetryTaskSilently(String message) {
super(message);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ public Boolean call() throws Exception {
Collection<String> requiredHostGroups = getTopologyRequiredHostGroups();

if (!areHostGroupsResolved(requiredHostGroups)) {
LOG.info("Some host groups require more hosts, cluster configuration cannot begin");
throw new AsyncCallableService.RetryTaskSilently();
String msg = "Some host groups require more hosts, cluster configuration cannot begin";
LOG.info(msg);
throw new AsyncCallableService.RetryTaskSilently(msg);
}

LOG.info("All required host groups are complete, cluster configuration can now begin");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package org.apache.ambari.server.topology;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.captureLong;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import org.easymock.Capture;
import org.easymock.EasyMockRule;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
Expand Down Expand Up @@ -81,6 +85,32 @@ public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exce
Assert.assertNull("No result expected in case of timeout", serviceResult);
}

@Test
public void lastErrorIsReturnedIfSubsequentAttemptTimesOut() throws Exception {
// GIVEN
Exception computationException = new ExecutionException(new ArithmeticException("Computation error during first attempt"));
Exception timeoutException = new TimeoutException("Timeout during second attempt");
expect(futureMock.get(TIMEOUT, TimeUnit.MILLISECONDS)).andThrow(computationException);
expect(executorServiceMock.schedule(taskMock, RETRY_DELAY, TimeUnit.MILLISECONDS)).andReturn(futureMock);
Capture<Long> timeoutCapture = Capture.newInstance();
expect(futureMock.get(captureLong(timeoutCapture), eq(TimeUnit.MILLISECONDS))).andThrow(timeoutException);
expect(futureMock.isDone()).andReturn(Boolean.FALSE);
expect(futureMock.cancel(true)).andReturn(Boolean.TRUE);
expect(executorServiceMock.submit(taskMock)).andReturn(futureMock);
onErrorMock.accept(computationException.getCause());
replayAll();

asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", executorServiceMock, onErrorMock);

// WHEN
Boolean serviceResult = asyncCallableService.call();

// THEN
verifyAll();
Assert.assertTrue(timeoutCapture.getValue() <= TIMEOUT - RETRY_DELAY);
Assert.assertNull("No result expected in case of timeout", serviceResult);
}

@Test
public void testCallableServiceShouldCancelTaskWhenTaskHangsAndTimeoutExceeded() throws Exception {
// GIVEN
Expand Down