Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.prerequisites.QueryPrerequisites;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.DataSize;
Expand Down Expand Up @@ -185,21 +186,22 @@ private void queueQuery()
public void startWaitingForResources()
{
if (stateMachine.transitionToWaitingForResources()) {
waitForMinimumWorkers();
waitForMinimumCoordinatorSidecarsAndWorkers();
}
}

private void waitForMinimumWorkers()
private void waitForMinimumCoordinatorSidecarsAndWorkers()
{
ListenableFuture<?> minimumWorkerFuture = clusterSizeMonitor.waitForMinimumWorkers();
// when worker requirement is met, wait for query execution to finish construction and then start the execution
addSuccessCallback(minimumWorkerFuture, () -> {
ListenableFuture<?> minimumResourcesFuture = Futures.allAsList(
clusterSizeMonitor.waitForMinimumCoordinatorSidecars(),
clusterSizeMonitor.waitForMinimumWorkers());
// when worker and sidecar requirement is met, wait for query execution to finish construction and then start the execution
addSuccessCallback(minimumResourcesFuture, () -> {
// It's the time to end waiting for resources
boolean isDispatching = stateMachine.transitionToDispatching();
addSuccessCallback(queryExecutionFuture, queryExecution -> startExecution(queryExecution, isDispatching));
});

addExceptionCallback(minimumWorkerFuture, throwable -> queryExecutor.execute(() -> fail(throwable)));
addExceptionCallback(minimumResourcesFuture, throwable -> queryExecutor.execute(() -> fail(throwable)));
}

private void startExecution(QueryExecution queryExecution, boolean isDispatching)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.AllNodes;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
Expand All @@ -29,13 +30,17 @@
import javax.inject.Inject;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;

import static com.facebook.airlift.concurrent.Threads.threadsNamed;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static com.facebook.presto.spi.StandardErrorCode.NO_CPP_SIDECARS;
import static com.facebook.presto.spi.StandardErrorCode.TOO_MANY_SIDECARS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.lang.String.format;
Expand All @@ -53,10 +58,13 @@ public class ClusterSizeMonitor
private final int coordinatorMinCount;
private final int coordinatorMinCountActive;
private final Duration coordinatorMaxWait;

private final Duration coordinatorSidecarMaxWait;
private final int resourceManagerMinCountActive;
private final ScheduledExecutorService executor;

private final Consumer<AllNodes> listener = this::updateAllNodes;
private final boolean isCoordinatorSidecarEnabled;

@GuardedBy("this")
private int currentWorkerCount;
Expand All @@ -67,14 +75,20 @@ public class ClusterSizeMonitor
@GuardedBy("this")
private int currentResourceManagerCount;

@GuardedBy("this")
private int currentCoordinatorSidecarCount;

@GuardedBy("this")
private final List<SettableFuture<?>> workerSizeFutures = new ArrayList<>();

@GuardedBy("this")
private final List<SettableFuture<?>> coordinatorSizeFutures = new ArrayList<>();

@GuardedBy("this")
private final List<SettableFuture<?>> coordinatorSidecarSizeFutures = new ArrayList<>();

@Inject
public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, QueryManagerConfig queryManagerConfig, NodeResourceStatusConfig nodeResourceStatusConfig)
public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, QueryManagerConfig queryManagerConfig, NodeResourceStatusConfig nodeResourceStatusConfig, ServerConfig serverConfig)
{
this(
nodeManager,
Expand All @@ -85,7 +99,9 @@ public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig n
queryManagerConfig.getRequiredCoordinators(),
nodeResourceStatusConfig.getRequiredCoordinatorsActive(),
queryManagerConfig.getRequiredCoordinatorsMaxWait(),
nodeResourceStatusConfig.getRequiredResourceManagersActive());
queryManagerConfig.getRequiredCoordinatorSidecarsMaxWait(),
nodeResourceStatusConfig.getRequiredResourceManagersActive(),
serverConfig.isCoordinatorSidecarEnabled());
}

public ClusterSizeMonitor(
Expand All @@ -97,7 +113,9 @@ public ClusterSizeMonitor(
int coordinatorMinCount,
int coordinatorMinCountActive,
Duration coordinatorMaxWait,
int resourceManagerMinCountActive)
Duration coordinatorSidecarMaxWait,
int resourceManagerMinCountActive,
boolean isCoordinatorSidecarEnabled)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.includeCoordinator = includeCoordinator;
Expand All @@ -111,9 +129,11 @@ public ClusterSizeMonitor(
checkArgument(coordinatorMinCountActive >= 0, "coordinatorMinCountActive is negative");
this.coordinatorMinCountActive = coordinatorMinCountActive;
this.coordinatorMaxWait = requireNonNull(coordinatorMaxWait, "coordinatorMaxWait is null");
this.coordinatorSidecarMaxWait = requireNonNull(coordinatorSidecarMaxWait, "coordinatorSidecarMaxWait is null");
checkArgument(resourceManagerMinCountActive >= 0, "resourceManagerMinCountActive is negative");
this.resourceManagerMinCountActive = resourceManagerMinCountActive;
this.executor = newSingleThreadScheduledExecutor(threadsNamed("node-monitor-%s"));
this.isCoordinatorSidecarEnabled = isCoordinatorSidecarEnabled;
}

@PostConstruct
Expand All @@ -140,7 +160,6 @@ public boolean hasRequiredWorkers()
}

/**
*
* @return true when the current resource manager count is greater or equals to
* minimum resource manager count for Coordinator.
*/
Expand All @@ -150,7 +169,6 @@ public boolean hasRequiredResourceManagers()
}

/**
*
* @return true when the current coordinator count in a cluster is greater or equals to
* minimum coordinator count for a given Coordinator.
*/
Expand All @@ -159,6 +177,19 @@ public boolean hasRequiredCoordinators()
return currentCoordinatorCount >= coordinatorMinCountActive;
}

/**
* @return true when the current coordinator sidecars count in a cluster is greater or equals to
* minimum coordinator sidecars count for a given Coordinator.
*/
public boolean hasRequiredCoordinatorSidecars()
{
if (currentCoordinatorSidecarCount > 1) {
throw new PrestoException(TOO_MANY_SIDECARS,
format("Expected a single active coordinator sidecar. Found %s active coordinator sidecars", currentCoordinatorSidecarCount));
}
return currentCoordinatorSidecarCount == 1;
}

/**
* Returns a listener that completes when the minimum number of workers for the cluster has been met.
* Note: caller should not add a listener using the direct executor, as this can delay the
Expand Down Expand Up @@ -224,6 +255,36 @@ public synchronized ListenableFuture<?> waitForMinimumCoordinators()
return future;
}

public synchronized ListenableFuture<?> waitForMinimumCoordinatorSidecars()
{
if (currentCoordinatorSidecarCount == 1 || !isCoordinatorSidecarEnabled) {
return immediateFuture(null);
}

SettableFuture<?> future = SettableFuture.create();
coordinatorSidecarSizeFutures.add(future);

// if future does not finish in wait period, complete with an exception
ScheduledFuture<?> timeoutTask = executor.schedule(
() -> {
synchronized (this) {
future.setException(new PrestoException(
NO_CPP_SIDECARS,
format("Insufficient active coordinator sidecar nodes. Waited %s for at least 1 coordinator sidecars, but only 0 coordinator sidecars are active", coordinatorSidecarMaxWait)));
}
},
coordinatorSidecarMaxWait.toMillis(),
MILLISECONDS);

// remove future if finished (e.g., canceled, timed out)
future.addListener(() -> {
timeoutTask.cancel(true);
removeCoordinatorSidecarFuture(future);
}, executor);

return future;
}

private synchronized void removeWorkerFuture(SettableFuture<?> future)
{
workerSizeFutures.remove(future);
Expand All @@ -234,20 +295,26 @@ private synchronized void removeCoordinatorFuture(SettableFuture<?> future)
coordinatorSizeFutures.remove(future);
}

private synchronized void removeCoordinatorSidecarFuture(SettableFuture<?> future)
{
coordinatorSidecarSizeFutures.remove(future);
}

private synchronized void updateAllNodes(AllNodes allNodes)
{
if (includeCoordinator) {
currentWorkerCount = allNodes.getActiveNodes().size();
}
else {
currentWorkerCount = Sets.difference(
Sets.difference(
allNodes.getActiveNodes(),
allNodes.getActiveCoordinators()),
allNodes.getActiveResourceManagers()).size();
Set<Node> activeNodes = new HashSet<>(allNodes.getActiveNodes());
activeNodes.removeAll(allNodes.getActiveCoordinators());
activeNodes.removeAll(allNodes.getActiveResourceManagers());
activeNodes.removeAll(allNodes.getActiveCoordinatorSidecars());
currentWorkerCount = activeNodes.size();
}
currentCoordinatorCount = allNodes.getActiveCoordinators().size();
currentResourceManagerCount = allNodes.getActiveResourceManagers().size();
currentCoordinatorSidecarCount = allNodes.getActiveCoordinatorSidecars().size();
if (currentWorkerCount >= workerMinCount) {
List<SettableFuture<?>> listeners = ImmutableList.copyOf(workerSizeFutures);
workerSizeFutures.clear();
Expand All @@ -258,5 +325,10 @@ private synchronized void updateAllNodes(AllNodes allNodes)
coordinatorSizeFutures.clear();
executor.submit(() -> listeners.forEach(listener -> listener.set(null)));
}
if (currentCoordinatorSidecarCount == 1) {
List<SettableFuture<?>> listeners = ImmutableList.copyOf(coordinatorSidecarSizeFutures);
coordinatorSidecarSizeFutures.clear();
executor.submit(() -> listeners.forEach(listener -> listener.set(null)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class QueryManagerConfig
private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES);
private int requiredCoordinators = 1;
private Duration requiredCoordinatorsMaxWait = new Duration(5, TimeUnit.MINUTES);
private Duration requiredCoordinatorSidecarsMaxWait = new Duration(5, TimeUnit.MINUTES);
private int requiredResourceManagers = 1;

private int querySubmissionMaxThreads = Runtime.getRuntime().availableProcessors() * 2;
Expand Down Expand Up @@ -604,6 +605,21 @@ public QueryManagerConfig setRequiredCoordinatorsMaxWait(Duration requiredCoordi
return this;
}

@NotNull
public Duration getRequiredCoordinatorSidecarsMaxWait()
{
return requiredCoordinatorSidecarsMaxWait;
}

@Experimental
@Config("query-manager.experimental.required-coordinator-sidecars-max-wait")
@ConfigDescription("Maximum time to wait for minimum number of coordinator sidecars before the query is failed")
public QueryManagerConfig setRequiredCoordinatorSidecarsMaxWait(Duration requiredCoordinatorSidecarsMaxWait)
{
this.requiredCoordinatorSidecarsMaxWait = requiredCoordinatorSidecarsMaxWait;
return this;
}

@Min(1)
public int getQuerySubmissionMaxThreads()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class AllNodes
private final Set<InternalNode> activeCoordinators;
private final Set<InternalNode> activeResourceManagers;
private final Set<InternalNode> activeCatalogServers;
private final Set<InternalNode> activeCoordinatorSidecars;
private final int activeWorkerCount;

public AllNodes(
Expand All @@ -37,14 +38,16 @@ public AllNodes(
Set<InternalNode> shuttingDownNodes,
Set<InternalNode> activeCoordinators,
Set<InternalNode> activeResourceManagers,
Set<InternalNode> activeCatalogServers)
Set<InternalNode> activeCatalogServers,
Set<InternalNode> activeCoordinatorSidecars)
{
this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null"));
this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null"));
this.shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
this.activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
this.activeResourceManagers = ImmutableSet.copyOf(requireNonNull(activeResourceManagers, "activeResourceManagers is null"));
this.activeCatalogServers = ImmutableSet.copyOf(requireNonNull(activeCatalogServers, "activeCatalogServers is null"));
this.activeCoordinatorSidecars = ImmutableSet.copyOf(requireNonNull(activeCoordinatorSidecars, "activeCoordinatorSidecars is null"));

this.activeWorkerCount = Sets.difference(Sets.difference(activeNodes, activeResourceManagers), activeCatalogServers).size();
}
Expand Down Expand Up @@ -84,6 +87,11 @@ public Set<InternalNode> getActiveCatalogServers()
return activeCatalogServers;
}

public Set<InternalNode> getActiveCoordinatorSidecars()
{
return activeCoordinatorSidecars;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -99,12 +107,13 @@ public boolean equals(Object o)
Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) &&
Objects.equals(activeCoordinators, allNodes.activeCoordinators) &&
Objects.equals(activeResourceManagers, allNodes.activeResourceManagers) &&
Objects.equals(activeCatalogServers, allNodes.activeCatalogServers);
Objects.equals(activeCatalogServers, allNodes.activeCatalogServers) &&
Objects.equals(activeCoordinatorSidecars, allNodes.activeCoordinatorSidecars);
}

@Override
public int hashCode()
{
return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators, activeResourceManagers, activeCatalogServers);
return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators, activeResourceManagers, activeCatalogServers, activeCoordinatorSidecars);
}
}
Loading