diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java index ecd213352e2..7142e493127 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java @@ -81,8 +81,8 @@ public T call() throws Exception { Future future = executorService.submit(task); LOG.info("Task {} execution started at {}", taskName, startTime); + Throwable lastError = null; while (true) { - Throwable lastError; try { LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft); T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS); @@ -90,7 +90,9 @@ public T call() throws Exception { return taskResult; } catch (TimeoutException e) { LOG.debug("Task {} timeout", taskName); - lastError = e; + if (lastError == null) { + lastError = e; + } timeLeft = 0; } catch (ExecutionException e) { Throwable cause = Throwables.getRootCause(e); @@ -98,10 +100,10 @@ public T call() throws Exception { LOG.info(String.format("Task %s exception during execution", taskName), cause); } lastError = cause; - timeLeft = timeout - (System.currentTimeMillis() - startTime); + timeLeft = timeout - (System.currentTimeMillis() - startTime) - retryDelay; } - if (timeLeft < retryDelay) { + if (timeLeft <= 0) { attemptToCancel(future); LOG.warn("Task {} timeout exceeded, no more retries", taskName); onError.accept(lastError); @@ -124,6 +126,12 @@ private void attemptToCancel(Future future) { public static class RetryTaskSilently extends RuntimeException { // marker, throw if the task needs to be retried + public RetryTaskSilently() { + super(); + } + public RetryTaskSilently(String message) { + super(message); + } } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java index 0f13ec2eee7..ed1c451c7c9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java @@ -71,8 +71,9 @@ public Boolean call() throws Exception { Collection requiredHostGroups = getTopologyRequiredHostGroups(); if (!areHostGroupsResolved(requiredHostGroups)) { - LOG.info("Some host groups require more hosts, cluster configuration cannot begin"); - throw new AsyncCallableService.RetryTaskSilently(); + String msg = "Some host groups require more hosts, cluster configuration cannot begin"; + LOG.info(msg); + throw new AsyncCallableService.RetryTaskSilently(msg); } LOG.info("All required host groups are complete, cluster configuration can now begin"); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java index 6fdb7984088..348b8272b55 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java @@ -19,16 +19,20 @@ package org.apache.ambari.server.topology; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.captureLong; +import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import org.easymock.Capture; import org.easymock.EasyMockRule; import org.easymock.EasyMockSupport; import org.easymock.Mock; @@ -81,6 +85,32 @@ public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exce Assert.assertNull("No result expected in case of timeout", serviceResult); } + @Test + public void lastErrorIsReturnedIfSubsequentAttemptTimesOut() throws Exception { + // GIVEN + Exception computationException = new ExecutionException(new ArithmeticException("Computation error during first attempt")); + Exception timeoutException = new TimeoutException("Timeout during second attempt"); + expect(futureMock.get(TIMEOUT, TimeUnit.MILLISECONDS)).andThrow(computationException); + expect(executorServiceMock.schedule(taskMock, RETRY_DELAY, TimeUnit.MILLISECONDS)).andReturn(futureMock); + Capture timeoutCapture = Capture.newInstance(); + expect(futureMock.get(captureLong(timeoutCapture), eq(TimeUnit.MILLISECONDS))).andThrow(timeoutException); + expect(futureMock.isDone()).andReturn(Boolean.FALSE); + expect(futureMock.cancel(true)).andReturn(Boolean.TRUE); + expect(executorServiceMock.submit(taskMock)).andReturn(futureMock); + onErrorMock.accept(computationException.getCause()); + replayAll(); + + asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", executorServiceMock, onErrorMock); + + // WHEN + Boolean serviceResult = asyncCallableService.call(); + + // THEN + verifyAll(); + Assert.assertTrue(timeoutCapture.getValue() <= TIMEOUT - RETRY_DELAY); + Assert.assertNull("No result expected in case of timeout", serviceResult); + } + @Test public void testCallableServiceShouldCancelTaskWhenTaskHangsAndTimeoutExceeded() throws Exception { // GIVEN