Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class CalculatedStatus {
private final double percent;

/**
* A status which represents a COMPLETED state at 0%
* A status which represents a COMPLETED state at 100%
*/
public static final CalculatedStatus COMPLETED = new CalculatedStatus(HostRoleStatus.COMPLETED,
HostRoleStatus.COMPLETED, 100.0);
Expand All @@ -65,6 +65,11 @@ public class CalculatedStatus {
public static final CalculatedStatus PENDING = new CalculatedStatus(HostRoleStatus.PENDING,
HostRoleStatus.PENDING, 0.0);

/**
* A status which represents an ABORTED state at -1%
*/
public static final CalculatedStatus ABORTED = new CalculatedStatus(HostRoleStatus.ABORTED, HostRoleStatus.ABORTED, -1);

// ----- Constructors ------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,31 +795,18 @@ private RequestStageContainer doUpdateResources(final RequestStageContainer stag
@Override
public RequestStageContainer invoke() throws AmbariException {
RequestStageContainer stageContainer = null;
int retriesRemaining = 100;
do {
try {
stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(),
runSmokeTest);
} catch (Exception e) {
if (--retriesRemaining == 0) {
LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e);
// !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500.
if (IllegalArgumentException.class.isInstance(e)) {
throw (IllegalArgumentException) e;
} else {
throw new RuntimeException("Update Host request submission failed: " + e, e);
}
} else {
LOG.info("Caught an exception while updating host components, retrying : " + e);
try {
Thread.sleep(250);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Update Host request submission failed: " + e, e);
}
}
try {
stageContainer = updateHostComponents(stages, requests, request.getRequestInfoProperties(),
runSmokeTest);
} catch (Exception e) {
LOG.info("Caught an exception while updating host components, will not try again: {}", e.getMessage(), e);
// !!! IllegalArgumentException results in a 400 response, RuntimeException results in 500.
if (e instanceof IllegalArgumentException) {
throw (IllegalArgumentException) e;
} else {
throw new RuntimeException("Update Host request submission failed: " + e, e);
}
} while (stageContainer == null);
}

return stageContainer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.apache.ambari.server.controller.internal.HostComponentResourceProvider.HOST_COMPONENT_SERVICE_NAME_PROPERTY_ID;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -72,6 +71,8 @@
import org.apache.ambari.server.utils.SecretReference;
import org.apache.commons.lang.StringUtils;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.inject.Inject;

Expand Down Expand Up @@ -127,11 +128,9 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
protected static final String HOSTS_PREDICATE = "hosts_predicate";
protected static final String ACTION_ID = "action";
protected static final String INPUTS_ID = "parameters";
protected static final String EXLUSIVE_ID = "exclusive";
protected static final String EXCLUSIVE_ID = "exclusive";
public static final String HAS_RESOURCE_FILTERS = "HAS_RESOURCE_FILTERS";
private static Set<String> pkPropertyIds =
new HashSet<String>(Arrays.asList(new String[]{
REQUEST_ID_PROPERTY_ID}));
private static final Set<String> PK_PROPERTY_IDS = ImmutableSet.of(REQUEST_ID_PROPERTY_ID);

private PredicateCompiler predicateCompiler = new PredicateCompiler();

Expand Down Expand Up @@ -418,7 +417,7 @@ public RequestStatus deleteResources(Request request, Predicate predicate)

@Override
protected Set<String> getPKPropertyIds() {
return pkPropertyIds;
return PK_PROPERTY_IDS;
}


Expand Down Expand Up @@ -476,8 +475,8 @@ private ExecuteActionRequest getActionRequest(Request request)
}

boolean exclusive = false;
if (requestInfoProperties.containsKey(EXLUSIVE_ID)) {
exclusive = Boolean.valueOf(requestInfoProperties.get(EXLUSIVE_ID).trim());
if (requestInfoProperties.containsKey(EXCLUSIVE_ID)) {
exclusive = Boolean.valueOf(requestInfoProperties.get(EXCLUSIVE_ID).trim());
}

return new ExecuteActionRequest(
Expand Down Expand Up @@ -751,7 +750,8 @@ private Resource getRequestResource(RequestEntity entity, String clusterName,
}

setResourceProperty(resource, REQUEST_ID_PROPERTY_ID, entity.getRequestId(), requestedPropertyIds);
setResourceProperty(resource, REQUEST_CONTEXT_ID, entity.getRequestContext(), requestedPropertyIds);
String requestContext = entity.getRequestContext();
setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds);
setResourceProperty(resource, REQUEST_TYPE_ID, entity.getRequestType(), requestedPropertyIds);

// Mask any sensitive data fields in the inputs data structure
Expand Down Expand Up @@ -802,15 +802,13 @@ private Resource getRequestResource(RequestEntity entity, String clusterName,
final CalculatedStatus status;
LogicalRequest logicalRequest = topologyManager.getRequest(entity.getRequestId());
if (summary.isEmpty() && null != logicalRequest) {
// In this case, it appears that there are no tasks but this is a logical
// topology request, so it's a matter of hosts simply not registering yet
// for tasks to be created ==> status = PENDING.
// For a new LogicalRequest there should be at least one HostRequest,
// while if they were removed already ==> status = COMPLETED.
if (logicalRequest.getHostRequests().isEmpty()) {
status = CalculatedStatus.COMPLETED;
} else {
status = CalculatedStatus.PENDING;
status = logicalRequest.calculateStatus();
if (status == CalculatedStatus.ABORTED) {
Optional<String> failureReason = logicalRequest.getFailureReason();
if (failureReason.isPresent()) {
requestContext += "\nFAILED: " + failureReason.get();
setResourceProperty(resource, REQUEST_CONTEXT_ID, requestContext, requestedPropertyIds);
}
}
} else {
// there are either tasks or this is not a logical request, so do normal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@
*/
package org.apache.ambari.server.orm.entities;

import java.util.Collection;

import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import javax.persistence.TableGenerator;
import java.util.Collection;

import org.apache.ambari.server.actionmanager.HostRoleStatus;

@Entity
@Table(name = "topology_host_request")
Expand All @@ -49,6 +51,13 @@ public class TopologyHostRequestEntity {
@Column(name = "host_name", length = 255)
private String hostName;

@Column(name = "status")
@Enumerated(EnumType.STRING)
private HostRoleStatus status;

@Column(name = "status_message")
private String statusMessage;

@ManyToOne
@JoinColumn(name = "logical_request_id", referencedColumnName = "id", nullable = false)
private TopologyLogicalRequestEntity topologyLogicalRequestEntity;
Expand Down Expand Up @@ -92,6 +101,22 @@ public void setHostName(String hostName) {
this.hostName = hostName;
}

public HostRoleStatus getStatus() {
return status;
}

public void setStatus(HostRoleStatus status) {
this.status = status;
}

public String getStatusMessage() {
return statusMessage;
}

public void setStatusMessage(String statusMessage) {
this.statusMessage = statusMessage;
}

public TopologyLogicalRequestEntity getTopologyLogicalRequestEntity() {
return topologyLogicalRequestEntity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;

/**
* Callable service implementation for executing tasks asynchronously.
Expand All @@ -55,19 +57,21 @@ public class AsyncCallableService<T> implements Callable<T> {

// the delay between two consecutive execution trials in milliseconds
private final long retryDelay;
private final Function<Throwable, ?> onError;

public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName) {
this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1));
public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, Function<Throwable, ?> onError) {
this(task, timeout, retryDelay, taskName, Executors.newScheduledThreadPool(1), onError);
}

public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService) {
public AsyncCallableService(Callable<T> task, long timeout, long retryDelay, String taskName, ScheduledExecutorService executorService, Function<Throwable, ?> onError) {
Preconditions.checkArgument(retryDelay > 0, "retryDelay should be positive");

this.task = task;
this.executorService = executorService;
this.timeout = timeout;
this.retryDelay = retryDelay;
this.taskName = taskName;
this.onError = onError;
}

@Override
Expand All @@ -78,25 +82,29 @@ public T call() throws Exception {
LOG.info("Task {} execution started at {}", taskName, startTime);

while (true) {
Throwable lastError;
try {
LOG.debug("Task {} waiting for result at most {} ms", taskName, timeLeft);
T taskResult = future.get(timeLeft, TimeUnit.MILLISECONDS);
LOG.info("Task {} successfully completed with result: {}", taskName, taskResult);
return taskResult;
} catch (TimeoutException e) {
LOG.debug("Task {} timeout", taskName);
lastError = e;
timeLeft = 0;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwable cause = Throwables.getRootCause(e);
if (!(cause instanceof RetryTaskSilently)) {
LOG.info(String.format("Task %s exception during execution", taskName), cause);
}
lastError = cause;
timeLeft = timeout - (System.currentTimeMillis() - startTime);
}

if (timeLeft < retryDelay) {
attemptToCancel(future);
LOG.warn("Task {} timeout exceeded, no more retries", taskName);
onError.apply(lastError);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import java.util.List;
import java.util.concurrent.Executor;

import org.apache.ambari.server.topology.tasks.TopologyTask;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.topology.tasks.TopologyHostTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,17 +41,17 @@ public enum Answer {ACCEPTED, DECLINED_PREDICATE, DECLINED_DONE}
private final Answer answer;
private final String hostGroupName;
private final long hostRequestId;
private final List<TopologyTask> tasks;
private final List<TopologyHostTask> tasks;

static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List<TopologyTask> tasks) {
static HostOfferResponse createAcceptedResponse(long hostRequestId, String hostGroupName, List<TopologyHostTask> tasks) {
return new HostOfferResponse(Answer.ACCEPTED, hostRequestId, hostGroupName, tasks);
}

private HostOfferResponse(Answer answer) {
this(answer, -1, null, null);
}

private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List<TopologyTask> tasks) {
private HostOfferResponse(Answer answer, long hostRequestId, String hostGroupName, List<TopologyHostTask> tasks) {
this.answer = answer;
this.hostRequestId = hostRequestId;
this.hostGroupName = hostGroupName;
Expand Down Expand Up @@ -78,12 +79,20 @@ void executeTasks(Executor executor, final String hostName, final ClusterTopolog
executor.execute(new Runnable() {
@Override
public void run() {
for (TopologyTask task : tasks) {
LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType());
task.run();
for (TopologyHostTask task : tasks) {
try {
LOG.info("Running task for accepted host offer for hostname = {}, task = {}", hostName, task.getType());
task.run();
} catch (Exception e) {
HostRequest hostRequest = task.getHostRequest();
LOG.error("{} task for host {} failed due to", task.getType(), hostRequest.getHostName(), e);
hostRequest.markHostRequestFailed(HostRoleStatus.ABORTED, e, ambariContext.getPersistedTopologyState());
break;
}
}
}
});
}
}

}
Loading