diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java index 3c415dfe8c7..c88983b2a1b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java @@ -54,7 +54,7 @@ public class CalculatedStatus { private final double percent; /** - * A status which represents a COMPLETED state at 0% + * A status which represents a COMPLETED state at 100% */ public static final CalculatedStatus COMPLETED = new CalculatedStatus(HostRoleStatus.COMPLETED, HostRoleStatus.COMPLETED, 100.0); @@ -65,6 +65,11 @@ public class CalculatedStatus { public static final CalculatedStatus PENDING = new CalculatedStatus(HostRoleStatus.PENDING, HostRoleStatus.PENDING, 0.0); + /** + * A status which represents an ABORTED state at -1% + */ + public static final CalculatedStatus ABORTED = new CalculatedStatus(HostRoleStatus.ABORTED, HostRoleStatus.ABORTED, -1); + // ----- Constructors ------------------------------------------------------ /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java index aaf46569081..d240a848f7d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java @@ -795,31 +795,18 @@ private RequestStageContainer doUpdateResources(final RequestStageContainer stag @Override public RequestStageContainer invoke() throws AmbariException { RequestStageContainer stageContainer = null; - int retriesRemaining = 100; - do { - try { - stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(), - runSmokeTest); - } catch (Exception e) { - if (--retriesRemaining == 0) { - LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e); - // !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500. - if (IllegalArgumentException.class.isInstance(e)) { - throw (IllegalArgumentException) e; - } else { - throw new RuntimeException("Update Host request submission failed: " + e, e); - } - } else { - LOG.info("Caught an exception while updating host components, retrying : " + e); - try { - Thread.sleep(250); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Update Host request submission failed: " + e, e); - } - } + try { + stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(), + runSmokeTest); + } catch (Exception e) { + LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e); + // !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500. + if (e instanceof IllegalArgumentException) { + throw (IllegalArgumentException) e; + } else { + throw new RuntimeException("Update Host request submission failed: " + e, e); } - } while (stageContainer == null); + } return stageContainer; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java index 46b1d0c9c00..1f690152739 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java @@ -23,7 +23,6 @@ import static org.apache.ambari.server.controller.internal.HostComponentResourceProvider.HOST_COMPONENT_SERVICE_NAME_PROPERTY_ID; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -72,6 +71,8 @@ import org.apache.ambari.server.utils.SecretReference; import org.apache.commons.lang.StringUtils; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.inject.Inject; @@ -127,11 +128,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider protected static final String HOSTS_PREDICATE = "hosts_predicate"; protected static final String ACTION_ID = "action"; protected static final String INPUTS_ID = "parameters"; - protected static final String EXLUSIVE_ID = "exclusive"; + protected static final String EXCLUSIVE_ID = "exclusive"; public static final String HAS_RESOURCE_FILTERS = "HAS_RESOURCE_FILTERS"; - private static Set pkPropertyIds = - new HashSet(Arrays.asList(new String[]{ - REQUEST_ID_PROPERTY_ID})); + private static final Set PK_PROPERTY_IDS = ImmutableSet.of(REQUEST_ID_PROPERTY_ID); private PredicateCompiler predicateCompiler = new PredicateCompiler(); @@ -418,7 +417,7 @@ public RequestStatus deleteResources(Request request, Predicate predicate) @Override protected Set getPKPropertyIds() { - return pkPropertyIds; + return PK_PROPERTY_IDS; } @@ -476,8 +475,8 @@ private ExecuteActionRequest getActionRequest(Request request) } boolean exclusive = false; - if (requestInfoProperties.containsKey(EXLUSIVE_ID)) { - exclusive = Boolean.valueOf(requestInfoProperties.get(EXLUSIVE_ID).trim()); + if (requestInfoProperties.containsKey(EXCLUSIVE_ID)) { + exclusive = Boolean.valueOf(requestInfoProperties.get(EXCLUSIVE_ID).trim()); } return new ExecuteActionRequest( @@ -751,7 +750,8 @@ private Resource getRequestResource(RequestEntity entity, String clusterName, } setResourceProperty(resource, REQUEST_ID_PROPERTY_ID, entity.getRequestId(), requestedPropertyIds); - setResourceProperty(resource, REQUEST_CONTEXT_ID, entity.getRequestContext(), requestedPropertyIds); + String requestContext = entity.getRequestContext(); + setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds); setResourceProperty(resource, REQUEST_TYPE_ID, entity.getRequestType(), requestedPropertyIds); // Mask any sensitive data fields in the inputs data structure @@ -802,15 +802,13 @@ private Resource getRequestResource(RequestEntity entity, String clusterName, final CalculatedStatus status; LogicalRequest logicalRequest = topologyManager.getRequest(entity.getRequestId()); if (summary.isEmpty() && null != logicalRequest) { - // In this case, it appears that there are no tasks but this is a logical - // topology request, so it's a matter of hosts simply not registering yet - // for tasks to be created ==> status = PENDING. - // For a new LogicalRequest there should be at least one HostRequest, - // while if they were removed already ==> status = COMPLETED. - if (logicalRequest.getHostRequests().isEmpty()) { - status = CalculatedStatus.COMPLETED; - } else { - status = CalculatedStatus.PENDING; + status = logicalRequest.calculateStatus(); + if (status == CalculatedStatus.ABORTED) { + Optional failureReason = logicalRequest.getFailureReason(); + if (failureReason.isPresent()) { + requestContext += "\nFAILED: " + failureReason.get(); + setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds); + } } } else { // there are either tasks or this is not a logical request, so do normal diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java index 7abbd51606d..149130232b1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java @@ -17,11 +17,13 @@ */ package org.apache.ambari.server.orm.entities; +import java.util.Collection; + import javax.persistence.CascadeType; import javax.persistence.Column; import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; +import javax.persistence.EnumType; +import javax.persistence.Enumerated; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; @@ -29,8 +31,8 @@ import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.Table; -import javax.persistence.TableGenerator; -import java.util.Collection; + +import org.apache.ambari.server.actionmanager.HostRoleStatus; @Entity @Table(name = "topology_host_request") @@ -49,6 +51,13 @@ public class TopologyHostRequestEntity { @Column(name = "host_name", length = 255) private String hostName; + @Column(name = "status") + @Enumerated(EnumType.STRING) + private HostRoleStatus status; + + @Column(name = "status_message") + private String statusMessage; + @ManyToOne @JoinColumn(name = "logical_request_id", referencedColumnName = "id", nullable = false) private TopologyLogicalRequestEntity topologyLogicalRequestEntity; @@ -92,6 +101,22 @@ public void setHostName(String hostName) { this.hostName = hostName; } + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + this.status = status; + } + + public String getStatusMessage() { + return statusMessage; + } + + public void setStatusMessage(String statusMessage) { + this.statusMessage = statusMessage; + } + public TopologyLogicalRequestEntity getTopologyLogicalRequestEntity() { return topologyLogicalRequestEntity; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java index db57378d91b..3abb52c927d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/AsyncCallableService.java @@ -29,7 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; /** * Callable service implementation for executing tasks asynchronously. @@ -55,12 +57,13 @@ public class AsyncCallableService implements Callable { // the delay between two consecutive execution trials in milliseconds private final long retryDelay; + private final Function onError; - public AsyncCallableService(Callable task, long timeout, long retryDelay, String taskName) { - this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1)); + public AsyncCallableService(Callable task, long timeout, long retryDelay, String taskName, Function onError) { + this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1), onError); } - public AsyncCallableService(Callable task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService) { + public AsyncCallableService(Callable task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService, Function onError) { Preconditions.checkArgument(retryDelay > 0, "retryDelay should be positive"); this.task = task; @@ -68,6 +71,7 @@ public AsyncCallableService(Callable task, long timeout, long retryDelay, Str this.timeout = timeout; this.retryDelay = retryDelay; this.taskName = taskName; + this.onError = onError; } @Override @@ -78,6 +82,7 @@ public T call() throws Exception { LOG.info("Task {} execution started at {}", taskName, startTime); while (true) { + Throwable lastError; try { LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft); T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS); @@ -85,18 +90,21 @@ public T call() throws Exception { return taskResult; } catch (TimeoutException e) { LOG.debug("Task {} timeout", taskName); + lastError = e; timeLeft = 0; } catch (ExecutionException e) { - Throwable cause = e.getCause(); + Throwable cause = Throwables.getRootCause(e); if (!(cause instanceof RetryTaskSilently)) { LOG.info(String.format("Task %s exception during execution", taskName), cause); } + lastError = cause; timeLeft = timeout - (System.currentTimeMillis() - startTime); } if (timeLeft < retryDelay) { attemptToCancel(future); LOG.warn("Task {} timeout exceeded, no more retries", taskName); + onError.apply(lastError); return null; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java index e220c50858a..2c29e5da58e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostOfferResponse.java @@ -22,7 +22,8 @@ import java.util.List; import java.util.concurrent.Executor; -import org.apache.ambari.server.topology.tasks.TopologyTask; +import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.topology.tasks.TopologyHostTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +41,9 @@ public enum Answer {ACCEPTED, DECLINED_PREDICATE, DECLINED_DONE} private final Answer answer; private final String hostGroupName; private final long hostRequestId; - private final List tasks; + private final List tasks; - static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List tasks) { + static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List tasks) { return new HostOfferResponse(Answer.ACCEPTED, hostRequestId, hostGroupName, tasks); } @@ -50,7 +51,7 @@ private HostOfferResponse(Answer answer) { this(answer, -1, null, null); } - private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List tasks) { + private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List tasks) { this.answer = answer; this.hostRequestId = hostRequestId; this.hostGroupName = hostGroupName; @@ -78,12 +79,20 @@ void executeTasks(Executor executor, final String hostName, final ClusterTopolog executor.execute(new Runnable() { @Override public void run() { - for (TopologyTask task : tasks) { - LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType()); - task.run(); + for (TopologyHostTask task : tasks) { + try { + LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType()); + task.run(); + } catch (Exception e) { + HostRequest hostRequest = task.getHostRequest(); + LOG.error("{} task for host {} failed due to", task.getType(), hostRequest.getHostName(), e); + hostRequest.markHostRequestFailed(HostRoleStatus.ABORTED, e, ambariContext.getPersistedTopologyState()); + break; + } } } }); } } + } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java index 8aed8aac1b9..6659e9ed415 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/HostRequest.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.predicate.InvalidQueryException; import org.apache.ambari.server.api.predicate.PredicateCompiler; import org.apache.ambari.server.controller.internal.HostResourceProvider; @@ -45,11 +46,14 @@ import org.apache.ambari.server.topology.tasks.PersistHostResourcesTask; import org.apache.ambari.server.topology.tasks.RegisterWithConfigGroupTask; import org.apache.ambari.server.topology.tasks.StartHostTask; +import org.apache.ambari.server.topology.tasks.TopologyHostTask; import org.apache.ambari.server.topology.tasks.TopologyTask; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.google.common.base.Optional; +import com.google.common.base.Throwables; /** * Represents a set of requests to a single host such as install, start, etc. @@ -69,6 +73,8 @@ public class HostRequest implements Comparable { private final long id; private boolean isOutstanding = true; private final boolean skipFailure; + private HostRoleStatus status = HostRoleStatus.PENDING; + private String statusMessage; private Map> logicalTaskMap = new HashMap>(); @@ -77,7 +83,7 @@ public class HostRequest implements Comparable { // logical task id -> physical tasks private Map physicalTasks = new HashMap(); - private List topologyTasks = new ArrayList(); + private List topologyTasks = new ArrayList<>(); private ClusterTopology topology; @@ -119,6 +125,8 @@ public HostRequest(long requestId, long id, String predicate, hostgroupName = entity.getTopologyHostGroupEntity().getName(); hostGroup = topology.getBlueprint().getHostGroup(hostgroupName); hostname = entity.getHostName(); + setStatus(entity.getStatus()); + statusMessage = entity.getStatusMessage(); this.predicate = toPredicate(predicate); containsMaster = hostGroup.containsMasterComponent(); this.topology = topology; @@ -134,6 +142,15 @@ public HostRequest(long requestId, long id, String predicate, (hostname == null ? "Host Assignment Pending" : hostname)); } + void markHostRequestFailed(HostRoleStatus status, Throwable cause, PersistedState persistedState) { + String errorMessage = StringUtils.substringBefore(Throwables.getRootCause(cause).getMessage(), "\n"); + LOG.info("HostRequest: marking host request {} for {} as {} due to {}", id, hostname, status, errorMessage); + abortPendingTasks(); + setStatus(status); + setStatusMessage(errorMessage); + persistedState.setHostRequestStatus(id, status, errorMessage); + } + //todo: synchronization public synchronized HostOfferResponse offer(Host host) { if (!isOutstanding) { @@ -149,6 +166,24 @@ public synchronized HostOfferResponse offer(Host host) { } } + public HostRoleStatus getStatus() { + return status; + } + + public void setStatus(HostRoleStatus status) { + if (status != null) { + this.status = status; + } + } + + public void setStatusMessage(String errorMessage) { + this.statusMessage = errorMessage; + } + + public Optional getStatusMessage() { + return Optional.fromNullable(statusMessage); + } + public void setHostName(String hostName) { hostname = hostName; } @@ -307,7 +342,7 @@ private void setHostOnTasks(Host host) { } } - public List getTopologyTasks() { + public List getTopologyTasks() { return topologyTasks; } @@ -341,6 +376,9 @@ public Collection getLogicalTasks() { logicalTask.setStructuredOut(physicalTask.getStructuredOut()); } } + if (logicalTask.getStatus() == HostRoleStatus.PENDING && status != HostRoleStatus.PENDING) { + logicalTask.setStatus(status); + } } return logicalTasks.values(); } @@ -440,6 +478,14 @@ public void registerPhysicalTaskId(long logicalTaskId, long physicalTaskId) { getLogicalTask(logicalTaskId).incrementAttemptCount(); } + public void abortPendingTasks() { + for (HostRoleCommand command : getLogicalTasks()) { + if (command.getStatus() == HostRoleStatus.PENDING) { + command.setStatus(HostRoleStatus.ABORTED); + } + } + } + private Predicate toPredicate(String predicate) { Predicate compiledPredicate = null; try { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java index 6df9bc78c7f..aa7c21ef78c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java @@ -38,6 +38,7 @@ import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; import org.apache.ambari.server.controller.ShortTaskStatus; +import org.apache.ambari.server.controller.internal.CalculatedStatus; import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO; import org.apache.ambari.server.orm.entities.StageEntity; import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity; @@ -49,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; @@ -405,6 +407,40 @@ public Set removeHostRequestByHostName(String hostName) { return removed; } + /** + * @return true if all the tasks in the logical request are in completed state, false otherwise + */ + public boolean isFinished() { + for (ShortTaskStatus ts : getRequestStatus().getTasks()) { + if (!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) { + return false; + } + } + return true; + } + + /** + * Returns if all the tasks in the logical request have completed state. + */ + public boolean isSuccessful() { + for (ShortTaskStatus ts : getRequestStatus().getTasks()) { + if (HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) { + return false; + } + } + return true; + } + + public Optional getFailureReason() { + for (HostRequest request : getHostRequests()) { + Optional failureReason = request.getStatusMessage(); + if (failureReason.isPresent()) { + return failureReason; + } + } + return Optional.absent(); + } + private void createHostRequests(TopologyRequest request, ClusterTopology topology) { Map hostGroupInfoMap = request.getHostGroupInfo(); Blueprint blueprint = topology.getBlueprint(); @@ -523,4 +559,12 @@ private synchronized static AmbariManagementController getController() { } return controller; } + + public CalculatedStatus calculateStatus() { + return !isFinished() + ? CalculatedStatus.PENDING + : isSuccessful() + ? CalculatedStatus.COMPLETED + : CalculatedStatus.ABORTED; + } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java index 8d55c16b74d..b61c02a221c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedState.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.state.Host; @@ -86,4 +87,8 @@ public interface PersistedState { */ void removeHostRequests(long logicalRequestId, Collection hostRequests); + /** + * Update the status of the given host request. + */ + void setHostRequestStatus(long hostRequestId, HostRoleStatus status, String message); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java index 1def4df4756..52fcb6a8748 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/PersistedStateImpl.java @@ -28,6 +28,7 @@ import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleCommand; +import org.apache.ambari.server.actionmanager.HostRoleStatus; import org.apache.ambari.server.api.predicate.InvalidQueryException; import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.orm.dao.HostDAO; @@ -141,6 +142,16 @@ public void removeHostRequests(long logicalRequestId, Collection ho } } + @Override + public void setHostRequestStatus(long hostRequestId, HostRoleStatus status, String message) { + TopologyHostRequestEntity hostRequestEntity = hostRequestDAO.findById(hostRequestId); + if (hostRequestEntity != null) { + hostRequestEntity.setStatus(status); + hostRequestEntity.setStatusMessage(message); + hostRequestDAO.merge(hostRequestEntity); + } + } + @Override public void registerPhysicalTask(long logicalTaskId, long physicalTaskId) { TopologyLogicalTaskEntity entity = topologyLogicalTaskDAO.findById(logicalTaskId); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java index 7eb88cfa03b..511d538641e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java @@ -33,7 +33,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import javax.inject.Inject; import org.apache.ambari.server.AmbariException; @@ -43,7 +45,6 @@ import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RequestStatusResponse; -import org.apache.ambari.server.controller.ShortTaskStatus; import org.apache.ambari.server.controller.internal.ArtifactResourceProvider; import org.apache.ambari.server.controller.internal.BaseClusterRequest; import org.apache.ambari.server.controller.internal.CalculatedStatus; @@ -81,6 +82,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Function; import com.google.common.eventbus.Subscribe; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; @@ -337,7 +339,7 @@ public LogicalRequest call() throws Exception { clusterTopologyMap.put(clusterId, topology); - addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true, + addClusterConfigRequest(logicalRequest, topology, new ClusterConfigurationRequest(ambariContext, topology, true, stackAdvisorBlueprintProcessor, securityType == SecurityType.KERBEROS)); // Process the logical request @@ -1051,9 +1053,17 @@ private void replayRequests(Map> persisted if (!configChecked) { configChecked = true; if (!ambariContext.isTopologyResolved(topology.getClusterId())) { - LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request"); - addClusterConfigRequest(topology, new ClusterConfigurationRequest( - ambariContext, topology, false, stackAdvisorBlueprintProcessor)); + if (provisionRequest == null) { + LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request missing, skipping cluster config request"); + } else if (provisionRequest.isFinished()) { + LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, but provision request is finished, skipping cluster config request"); + } else { + LOG.info("TopologyManager.replayRequests: no config with TOPOLOGY_RESOLVED found, adding cluster config request"); + ClusterConfigurationRequest configRequest = new ClusterConfigurationRequest(ambariContext, topology, false, stackAdvisorBlueprintProcessor); + addClusterConfigRequest(provisionRequest, topology, configRequest); + } + } else { + getOrCreateTopologyTaskExecutor(topology.getClusterId()).start(); } } } @@ -1061,36 +1071,17 @@ private void replayRequests(Map> persisted } /** - * @param logicalRequest * @return true if all the tasks in the logical request are in completed state, false otherwise */ private boolean isLogicalRequestFinished(LogicalRequest logicalRequest) { - if(logicalRequest != null) { - boolean completed = true; - for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) { - if(!HostRoleStatus.valueOf(ts.getStatus()).isCompletedState()) { - completed = false; - } - } - return completed; - } - return false; + return logicalRequest != null && logicalRequest.isFinished(); } /** * Returns if all the tasks in the logical request have completed state. - * @param logicalRequest - * @return */ private boolean isLogicalRequestSuccessful(LogicalRequest logicalRequest) { - if(logicalRequest != null) { - for(ShortTaskStatus ts : logicalRequest.getRequestStatus().getTasks()) { - if(HostRoleStatus.valueOf(ts.getStatus()) != HostRoleStatus.COMPLETED) { - return false; - } - } - } - return true; + return logicalRequest != null && logicalRequest.isSuccessful(); } //todo: this should invoke a callback on each 'service' in the topology @@ -1121,9 +1112,20 @@ private void addKerberosClient(ClusterTopology topology) { * @param topology cluster topology * @param configurationRequest configuration request to be executed */ - private void addClusterConfigRequest(ClusterTopology topology, ClusterConfigurationRequest configurationRequest) { + private void addClusterConfigRequest(final LogicalRequest logicalRequest, ClusterTopology topology, ClusterConfigurationRequest configurationRequest) { ConfigureClusterTask task = configureClusterTaskFactory.createConfigureClusterTask(topology, configurationRequest, ambariEventPublisher); - executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask")); + Function onConfigureClusterError = new Function() { + @Nullable @Override + public Void apply(Throwable input) { + HostRoleStatus status = input instanceof TimeoutException ? HostRoleStatus.TIMEDOUT : HostRoleStatus.FAILED; + LOG.info("ConfigureClusterTask failed, marking host requests {}", status); + for (HostRequest hostRequest : logicalRequest.getHostRequests()) { + hostRequest.markHostRequestFailed(status, input, persistedState); + } + return null; + } + }; + executor.submit(new AsyncCallableService<>(task, task.getTimeout(), task.getRepeatDelay(),"ConfigureClusterTask", onConfigureClusterError)); } /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java index 82a2f6e2f67..3db02e22bf6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyHostTask.java @@ -39,6 +39,10 @@ public TopologyHostTask(ClusterTopology topology, HostRequest hostRequest) { this.hostRequest = hostRequest; } + public HostRequest getHostRequest() { + return hostRequest; + } + /** * Run with an InternalAuthenticationToken as when running these tasks we might not have any active security context. */ diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java index 0753c3da222..6f2d7f5b624 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/TopologyTask.java @@ -19,6 +19,12 @@ package org.apache.ambari.server.topology.tasks; +import java.util.Set; + +import org.apache.ambari.server.RoleCommand; + +import com.google.common.collect.ImmutableSet; + /** * Task which is executed by the TopologyManager. */ @@ -26,11 +32,23 @@ public interface TopologyTask extends Runnable { /** * Task type. */ - public enum Type { + enum Type { RESOURCE_CREATION, CONFIGURE, INSTALL, - START + START { + @Override + public Set tasksToAbortOnFailure() { + return ImmutableSet.of(RoleCommand.START); + } + }, + ; + + private static Set ALL_TASKS = ImmutableSet.of(RoleCommand.INSTALL, RoleCommand.START); + + public Set tasksToAbortOnFailure() { + return ALL_TASKS; + } } /** @@ -38,5 +56,5 @@ public enum Type { * * @return the type of task */ - public Type getType(); + Type getType(); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java index 2ab8dc8e545..fb7ce22567b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java @@ -190,6 +190,7 @@ protected void configure() { catalogBinder.addBinding().to(UpgradeCatalog252.class); catalogBinder.addBinding().to(UpgradeCatalog260.class); catalogBinder.addBinding().to(UpgradeCatalog261.class); + catalogBinder.addBinding().to(UpgradeCatalog262.class); catalogBinder.addBinding().to(UpdateAlertScriptPaths.class); catalogBinder.addBinding().to(FinalUpgradeCatalog.class); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java new file mode 100644 index 00000000000..f83204d8106 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog262.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.upgrade; + +import java.sql.SQLException; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.orm.DBAccessor; + +import com.google.inject.Inject; +import com.google.inject.Injector; + +/** + * The {@link UpgradeCatalog262} upgrades Ambari from 2.6.1 to 2.6.2. + */ +public class UpgradeCatalog262 extends AbstractUpgradeCatalog { + + private static final String HOST_REQUEST_TABLE = "topology_host_request"; + private static final String STATUS_COLUMN = "status"; + private static final String STATUS_MESSAGE_COLUMN = "status_message"; + + @Inject + public UpgradeCatalog262(Injector injector) { + super(injector); + } + + @Override + public String getSourceVersion() { + return "2.6.1"; + } + + @Override + public String getTargetVersion() { + return "2.6.2"; + } + + @Override + protected void executeDDLUpdates() throws AmbariException, SQLException { + addHostRequestStatusColumn(); + } + + private void addHostRequestStatusColumn() throws SQLException { + dbAccessor.addColumn(HOST_REQUEST_TABLE, new DBAccessor.DBColumnInfo(STATUS_COLUMN, String.class, 255, null, true)); + dbAccessor.addColumn(HOST_REQUEST_TABLE, new DBAccessor.DBColumnInfo(STATUS_MESSAGE_COLUMN, String.class, 1024, null, true)); + } + + @Override + protected void executePreDMLUpdates() throws AmbariException, SQLException { + } + + @Override + protected void executeDMLUpdates() throws AmbariException, SQLException { + } + +} diff --git a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql index 058fa2a3c60..cf6fe745040 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql @@ -756,6 +756,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql index ec692b8b150..fe1e7ffcd45 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql @@ -773,6 +773,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql index 65be45925c9..5d162589d45 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql @@ -752,6 +752,8 @@ CREATE TABLE topology_host_request ( group_id NUMBER(19) NOT NULL, stage_id NUMBER(19) NOT NULL, host_name VARCHAR(255), + status VARCHAR2(255), + status_message VARCHAR2(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql index 122fd8be128..ab403157111 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql @@ -754,6 +754,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql index 1a98f3b7024..28bbd440432 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql @@ -750,6 +750,8 @@ CREATE TABLE topology_host_request ( group_id NUMERIC(19) NOT NULL, stage_id NUMERIC(19) NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql index 3d48a86083f..49fc473b247 100644 --- a/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql +++ b/ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql @@ -771,6 +771,8 @@ CREATE TABLE topology_host_request ( group_id BIGINT NOT NULL, stage_id BIGINT NOT NULL, host_name VARCHAR(255), + status VARCHAR(255), + status_message VARCHAR(1024), CONSTRAINT PK_topology_host_request PRIMARY KEY CLUSTERED (id), CONSTRAINT FK_hostreq_group_id FOREIGN KEY (group_id) REFERENCES topology_hostgroup(id), CONSTRAINT FK_hostreq_logicalreq_id FOREIGN KEY (logical_request_id) REFERENCES topology_logical_request(id)); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java index fb9c4fdef35..4e7dfebe632 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/RequestResourceProviderTest.java @@ -28,11 +28,13 @@ import static org.powermock.api.easymock.PowerMock.createMock; import static org.powermock.api.easymock.PowerMock.createNiceMock; import static org.powermock.api.easymock.PowerMock.replay; +import static org.powermock.api.easymock.PowerMock.replayAll; import static org.powermock.api.easymock.PowerMock.reset; +import static org.powermock.api.easymock.PowerMock.resetAll; import static org.powermock.api.easymock.PowerMock.verify; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -99,7 +101,11 @@ import org.springframework.security.core.Authentication; import org.springframework.security.core.context.SecurityContextHolder; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; @@ -1187,9 +1193,7 @@ public void testCreateResourcesForCommandWithHostPredicate() throws Exception { capture(requestCapture), capture(predicateCapture))).andReturn(Collections.singleton(resource)); // replay - replay(managementController, response, controller, - hostComponentProcessResourceProvider, resource, clusters); - PowerMock.replayAll(); + replayAll(); SecurityContextHolder.getContext().setAuthentication( TestAuthenticationFactory.createAdministrator()); @@ -1664,127 +1668,108 @@ public void testRequestStatusWithNoTasks() throws Exception { } /** - * Tests that topology requests return different status (PENDING) if there are - * no tasks. Normal requests should return COMPLETED. - * - * @throws Exception + * Tests that if there are no tasks, topology requests return status they get from the logical request. */ @Test @PrepareForTest(AmbariServer.class) public void testGetLogicalRequestStatusWithNoTasks() throws Exception { - // Given - Resource.Type type = Resource.Type.Request; - - AmbariManagementController managementController = createMock(AmbariManagementController.class); - ActionManager actionManager = createNiceMock(ActionManager.class); - - Clusters clusters = createNiceMock(Clusters.class); - - RequestEntity requestMock = createNiceMock(RequestEntity.class); - - expect(requestMock.getRequestContext()).andReturn("this is a context").anyTimes(); - expect(requestMock.getRequestId()).andReturn(100L).anyTimes(); - Capture> requestIdsCapture = Capture.newInstance(); - - - ClusterTopology topology = createNiceMock(ClusterTopology.class); - - HostGroup hostGroup = createNiceMock(HostGroup.class); - expect(hostGroup.getName()).andReturn("host_group_1").anyTimes(); - - Blueprint blueprint = createNiceMock(Blueprint.class); - expect(blueprint.getHostGroup("host_group_1")).andReturn(hostGroup).anyTimes(); - expect(topology.getClusterId()).andReturn(2L).anyTimes(); - - Long clusterId = 2L; - String clusterName = "cluster1"; - Cluster cluster = createNiceMock(Cluster.class); - expect(cluster.getClusterId()).andReturn(clusterId).anyTimes(); - expect(cluster.getClusterName()).andReturn(clusterName).anyTimes(); - - expect(managementController.getActionManager()).andReturn(actionManager).anyTimes(); - expect(managementController.getClusters()).andReturn(clusters).anyTimes(); - expect(clusters.getCluster(eq(clusterName))).andReturn(cluster).anyTimes(); - expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes(); - expect(requestDAO.findByPks(capture(requestIdsCapture), eq(true))).andReturn(Lists.newArrayList(requestMock)); - expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn( - Collections.emptyMap()).anyTimes(); - - Map hostGroupInfoMap = new HashMap<>(); - HostGroupInfo hostGroupInfo = new HostGroupInfo("host_group_1"); - hostGroupInfo.setRequestedCount(1); - hostGroupInfoMap.put("host_group_1", hostGroupInfo); - - TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class); - expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes(); - expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); - expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes(); - - - - PowerMock.mockStatic(AmbariServer.class); - expect(AmbariServer.getController()).andReturn(managementController).anyTimes(); - - PowerMock.replayAll( - topologyRequest, - topology, - blueprint, - managementController, - clusters); - - - LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class); - Collection hostRequests = new ArrayList<>(); - HostRequest hostRequest = createNiceMock(HostRequest.class); - hostRequests.add(hostRequest); - expect(logicalRequest.getHostRequests()).andReturn(hostRequests).anyTimes(); - expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes(); - - reset(topologyManager); - - expect(topologyManager.getRequest(100L)).andReturn(logicalRequest).anyTimes(); - - - expect(topologyManager.getRequests(eq(Collections.singletonList(100L)))).andReturn( - Collections.singletonList(logicalRequest)).anyTimes(); - expect(topologyManager.getStageSummaries(EasyMock.anyObject())).andReturn( - Collections.emptyMap()).anyTimes(); - - replay(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest, hostRequest); - - ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( - type, - PropertyHelper.getPropertyIds(type), - PropertyHelper.getKeyPropertyIds(type), - managementController); - - Set propertyIds = ImmutableSet.of( - RequestResourceProvider.REQUEST_ID_PROPERTY_ID, - RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID, - RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID - ); - - Predicate predicate = new PredicateBuilder(). - property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID).equals("100"). - toPredicate(); - - Request request = PropertyHelper.getReadRequest(propertyIds); - - // When - Set resources = provider.getResources(request, predicate); - - // Then - - - // verify - PowerMock.verifyAll(); - verify(actionManager, requestMock, requestDAO, hrcDAO, topologyManager, logicalRequest, hostRequest); - - Assert.assertEquals(1, resources.size()); - for (Resource resource : resources) { - Assert.assertEquals(100L, (long)(Long) resource.getPropertyValue(RequestResourceProvider.REQUEST_ID_PROPERTY_ID)); - Assert.assertEquals("PENDING", resource.getPropertyValue(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID)); - Assert.assertEquals(0.0, resource.getPropertyValue(RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID)); + Iterable statusList = ImmutableList.of(CalculatedStatus.COMPLETED, CalculatedStatus.PENDING, CalculatedStatus.ABORTED); + for (CalculatedStatus calculatedStatus : statusList) { + // Given + resetAll(); + + PowerMock.mockStatic(AmbariServer.class); + AmbariManagementController managementController = createMock(AmbariManagementController.class); + ActionManager actionManager = createNiceMock(ActionManager.class); + Clusters clusters = createNiceMock(Clusters.class); + Cluster cluster = createNiceMock(Cluster.class); + RequestEntity requestMock = createNiceMock(RequestEntity.class); + Blueprint blueprint = createNiceMock(Blueprint.class); + ClusterTopology topology = createNiceMock(ClusterTopology.class); + HostGroup hostGroup = createNiceMock(HostGroup.class); + TopologyRequest topologyRequest = createNiceMock(TopologyRequest.class); + LogicalRequest logicalRequest = createNiceMock(LogicalRequest.class); + HostRequest hostRequest = createNiceMock(HostRequest.class); + + Long requestId = 100L; + Long clusterId = 2L; + String clusterName = "cluster1"; + String hostGroupName = "host_group_1"; + HostGroupInfo hostGroupInfo = new HostGroupInfo(hostGroupName); + hostGroupInfo.setRequestedCount(1); + Map hostGroupInfoMap = ImmutableMap.of(hostGroupName, hostGroupInfo); + Collection hostRequests = Collections.singletonList(hostRequest); + Map dtoMap = Collections.emptyMap(); + + expect(AmbariServer.getController()).andReturn(managementController).anyTimes(); + expect(requestMock.getRequestContext()).andReturn("this is a context").anyTimes(); + expect(requestMock.getRequestId()).andReturn(requestId).anyTimes(); + expect(hostGroup.getName()).andReturn(hostGroupName).anyTimes(); + expect(blueprint.getHostGroup(hostGroupName)).andReturn(hostGroup).anyTimes(); + expect(topology.getClusterId()).andReturn(2L).anyTimes(); + expect(cluster.getClusterId()).andReturn(clusterId).anyTimes(); + expect(cluster.getClusterName()).andReturn(clusterName).anyTimes(); + expect(managementController.getActionManager()).andReturn(actionManager).anyTimes(); + expect(managementController.getClusters()).andReturn(clusters).anyTimes(); + expect(clusters.getCluster(eq(clusterName))).andReturn(cluster).anyTimes(); + expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes(); + Collection requestIds = anyObject(); + expect(requestDAO.findByPks(requestIds, eq(true))).andReturn(Lists.newArrayList(requestMock)); + expect(hrcDAO.findAggregateCounts((Long) anyObject())).andReturn(dtoMap).anyTimes(); + expect(topologyManager.getRequest(requestId)).andReturn(logicalRequest).anyTimes(); + expect(topologyManager.getRequests(eq(Collections.singletonList(requestId)))).andReturn(Collections.singletonList(logicalRequest)).anyTimes(); + expect(topologyManager.getStageSummaries(EasyMock.anyObject())).andReturn(dtoMap).anyTimes(); + + expect(topologyRequest.getHostGroupInfo()).andReturn(hostGroupInfoMap).anyTimes(); + expect(topology.getBlueprint()).andReturn(blueprint).anyTimes(); + expect(blueprint.shouldSkipFailure()).andReturn(true).anyTimes(); + + expect(logicalRequest.getHostRequests()).andReturn(hostRequests).anyTimes(); + expect(logicalRequest.constructNewPersistenceEntity()).andReturn(requestMock).anyTimes(); + expect(logicalRequest.calculateStatus()).andReturn(calculatedStatus).anyTimes(); + Optional failureReason = calculatedStatus == CalculatedStatus.ABORTED + ? Optional.of("some reason") + : Optional.absent(); + expect(logicalRequest.getFailureReason()).andReturn(failureReason).anyTimes(); + + replayAll(); + + Resource.Type type = Resource.Type.Request; + ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( + type, + PropertyHelper.getPropertyIds(type), + PropertyHelper.getKeyPropertyIds(type), + managementController + ); + + Set propertyIds = ImmutableSet.of( + RequestResourceProvider.REQUEST_ID_PROPERTY_ID, + RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID, + RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID, + RequestResourceProvider.REQUEST_CONTEXT_ID + ); + + Predicate predicate = new PredicateBuilder(). + property(RequestResourceProvider.REQUEST_ID_PROPERTY_ID).equals("100"). + toPredicate(); + + Request request = PropertyHelper.getReadRequest(propertyIds); + + // When + Set resources = provider.getResources(request, predicate); + + // Then + verifyAll(); + + Assert.assertEquals(1, resources.size()); + Resource resource = Iterables.getOnlyElement(resources); + Assert.assertEquals(requestId, resource.getPropertyValue(RequestResourceProvider.REQUEST_ID_PROPERTY_ID)); + Assert.assertEquals(calculatedStatus.getStatus().toString(), resource.getPropertyValue(RequestResourceProvider.REQUEST_STATUS_PROPERTY_ID)); + Assert.assertEquals(calculatedStatus.getPercent(), resource.getPropertyValue(RequestResourceProvider.REQUEST_PROGRESS_PERCENT_ID)); + + Object requestContext = resource.getPropertyValue(RequestResourceProvider.REQUEST_CONTEXT_ID); + Assert.assertNotNull(requestContext); + Assert.assertTrue(!failureReason.isPresent() || requestContext.toString().contains(failureReason.get())); } } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java index febe5917191..bd5dde6268e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersTest.java @@ -79,7 +79,7 @@ import org.apache.ambari.server.topology.PersistedState; import org.apache.ambari.server.topology.TopologyManager; import org.apache.ambari.server.topology.TopologyRequest; -import org.apache.ambari.server.topology.tasks.TopologyTask; +import org.apache.ambari.server.topology.tasks.TopologyHostTask; import org.apache.ambari.server.utils.EventBusSynchronizer; import org.junit.After; import org.junit.Before; @@ -650,7 +650,7 @@ private HostRequest createHostRequest(long hrId, String hostName) { expect(hr.getHostgroupName()).andReturn("MyHostGroup").anyTimes(); expect(hr.getHostName()).andReturn(hostName).anyTimes(); expect(hr.getStageId()).andReturn(1L); - expect(hr.getTopologyTasks()).andReturn(Collections.emptyList()); + expect(hr.getTopologyTasks()).andReturn(Collections.emptyList()); replay(hr); return hr; diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java index bf8fd791b25..3930e2e6ce5 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/AsyncCallableServiceTest.java @@ -18,6 +18,7 @@ package org.apache.ambari.server.topology; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.expect; import java.util.concurrent.Callable; @@ -34,6 +35,8 @@ import org.junit.Rule; import org.junit.Test; +import com.google.common.base.Function; + public class AsyncCallableServiceTest extends EasyMockSupport { private static final long TIMEOUT = 1000; // default timeout @@ -51,19 +54,24 @@ public class AsyncCallableServiceTest extends EasyMockSupport { @Mock private ScheduledFuture futureMock; + @Mock + private Function onErrorMock; + private AsyncCallableService asyncCallableService; @Test public void testCallableServiceShouldCancelTaskWhenTimeoutExceeded() throws Exception { // GIVEN long timeout = -1; // guaranteed timeout - expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(new TimeoutException("Testing the timeout exceeded case")); + TimeoutException timeoutException = new TimeoutException("Testing the timeout exceeded case"); + expect(futureMock.get(timeout, TimeUnit.MILLISECONDS)).andThrow(timeoutException); expect(futureMock.isDone()).andReturn(Boolean.FALSE); expect(futureMock.cancel(true)).andReturn(Boolean.TRUE); expect(executorServiceMock.submit(taskMock)).andReturn(futureMock); + expect(onErrorMock.apply(timeoutException)).andReturn(null); replayAll(); - asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock); + asyncCallableService = new AsyncCallableService<>(taskMock, timeout, RETRY_DELAY, "test", executorServiceMock, onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); @@ -84,13 +92,16 @@ public Boolean call() throws Exception { return false; } }; + expect(onErrorMock.apply(anyObject(TimeoutException.class))).andReturn(null); + replayAll(); - asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(hangingTask, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); // THEN + verifyAll(); Assert.assertNull("No result expected from hanging task", serviceResult); } @@ -98,8 +109,9 @@ public Boolean call() throws Exception { public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception { // GIVEN expect(taskMock.call()).andReturn(Boolean.TRUE); + expect(onErrorMock.apply(anyObject(TimeoutException.class))).andThrow(new AssertionError("No error expected")).anyTimes(); replayAll(); - asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); @@ -113,8 +125,9 @@ public void testCallableServiceShouldExitWhenTaskCompleted() throws Exception { public void testCallableServiceShouldRetryTaskExecutionTillTimeoutExceededWhenTaskThrowsException() throws Exception { // GIVEN expect(taskMock.call()).andThrow(new IllegalStateException("****************** TESTING ****************")).times(2, 3); + expect(onErrorMock.apply(anyObject(IllegalStateException.class))).andReturn(null).once(); replayAll(); - asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(taskMock, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); @@ -135,13 +148,16 @@ public Boolean call() throws Exception { throw new IllegalStateException("****************** TESTING ****************"); } }; + expect(onErrorMock.apply(anyObject(IllegalStateException.class))).andReturn(null).once(); + replayAll(); - asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test"); + asyncCallableService = new AsyncCallableService<>(throwingTask, TIMEOUT, RETRY_DELAY, "test", onErrorMock); // WHEN Boolean serviceResult = asyncCallableService.call(); // THEN + verifyAll(); Assert.assertNull("No result expected from throwing task", serviceResult); } } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java index 11f571bd7de..f5ac79535da 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java @@ -35,6 +35,8 @@ import org.junit.Rule; import org.junit.Test; +import com.google.common.base.Functions; + /** * Unit test for the ConfigureClusterTask class. * As business methods of this class don't return values, the assertions are made by verifying method calls on mocks. @@ -93,7 +95,7 @@ public void testsShouldConfigureClusterTaskExecuteWhenCalledFromAsyncCallableSer clusterConfigurationRequest.process(); replayAll(); - AsyncCallableService asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test"); + AsyncCallableService asyncService = new AsyncCallableService<>(testSubject, 5000, 500, "test", Functions.identity()); // WHEN asyncService.call(); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java index 07bb9877165..0c556f5d396 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/TopologyManagerTest.java @@ -403,6 +403,7 @@ public void testAddKerberosClientAtTopologyInit() throws Exception { List requestList = new ArrayList<>(); requestList.add(logicalRequest); expect(logicalRequest.hasPendingHostRequests()).andReturn(false).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(false).anyTimes(); allRequests.put(clusterTopologyMock, requestList); expect(requestStatusResponse.getTasks()).andReturn(Collections.emptyList()).anyTimes(); expect(clusterTopologyMock.isClusterKerberosEnabled()).andReturn(true); @@ -437,6 +438,8 @@ public void testBlueprintRequestCompletion() throws Exception { expect(persistedState.getAllRequests()).andReturn(Collections.>emptyMap()).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(true).anyTimes(); + expect(logicalRequest.isSuccessful()).andReturn(true).anyTimes(); replayAll(); topologyManager.provisionCluster(request); requestFinished(); @@ -460,6 +463,8 @@ public void testBlueprintRequestCompletion__Failure() throws Exception { expect(persistedState.getAllRequests()).andReturn(Collections.>emptyMap()).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(true).anyTimes(); + expect(logicalRequest.isSuccessful()).andReturn(false).anyTimes(); replayAll(); topologyManager.provisionCluster(request); requestFinished(); @@ -483,6 +488,7 @@ public void testBlueprintRequestCompletion__InProgress() throws Exception { expect(persistedState.getAllRequests()).andReturn(Collections.>emptyMap()).anyTimes(); expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(false).anyTimes(); replayAll(); topologyManager.provisionCluster(request); requestFinished(); @@ -523,6 +529,7 @@ public void testBlueprintRequestCompletion__Replay() throws Exception { expect(persistedState.getProvisionRequest(CLUSTER_ID)).andReturn(logicalRequest).anyTimes(); expect(logicalRequest.hasPendingHostRequests()).andReturn(true).anyTimes(); expect(logicalRequest.getCompletedHostRequests()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(logicalRequest.isFinished()).andReturn(true).anyTimes(); expect(requestStatusResponse.getTasks()).andReturn(tasks).anyTimes(); replayAll(); EasyMock.replay(clusterTopologyMock);