diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java index dc93153ea27f4..4babf8a95575e 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/LocalDispatchQuery.java @@ -31,6 +31,7 @@ import com.facebook.presto.spi.prerequisites.QueryPrerequisites; import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext; import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.DataSize; @@ -185,21 +186,22 @@ private void queueQuery() public void startWaitingForResources() { if (stateMachine.transitionToWaitingForResources()) { - waitForMinimumWorkers(); + waitForMinimumCoordinatorSidecarsAndWorkers(); } } - private void waitForMinimumWorkers() + private void waitForMinimumCoordinatorSidecarsAndWorkers() { - ListenableFuture minimumWorkerFuture = clusterSizeMonitor.waitForMinimumWorkers(); - // when worker requirement is met, wait for query execution to finish construction and then start the execution - addSuccessCallback(minimumWorkerFuture, () -> { + ListenableFuture minimumResourcesFuture = Futures.allAsList( + clusterSizeMonitor.waitForMinimumCoordinatorSidecars(), + clusterSizeMonitor.waitForMinimumWorkers()); + // when worker and sidecar requirement is met, wait for query execution to finish construction and then start the execution + addSuccessCallback(minimumResourcesFuture, () -> { // It's the time to end waiting for resources boolean isDispatching = stateMachine.transitionToDispatching(); addSuccessCallback(queryExecutionFuture, queryExecution -> startExecution(queryExecution, isDispatching)); }); - - addExceptionCallback(minimumWorkerFuture, throwable -> queryExecutor.execute(() -> fail(throwable))); + addExceptionCallback(minimumResourcesFuture, throwable -> queryExecutor.execute(() -> fail(throwable))); } private void startExecution(QueryExecution queryExecution, boolean isDispatching) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/ClusterSizeMonitor.java b/presto-main/src/main/java/com/facebook/presto/execution/ClusterSizeMonitor.java index c07e2aa36d6d6..396d6d575f2d3 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/ClusterSizeMonitor.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/ClusterSizeMonitor.java @@ -16,9 +16,10 @@ import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; import com.facebook.presto.metadata.AllNodes; import com.facebook.presto.metadata.InternalNodeManager; +import com.facebook.presto.server.ServerConfig; +import com.facebook.presto.spi.Node; import com.facebook.presto.spi.PrestoException; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.Duration; @@ -29,13 +30,17 @@ import javax.inject.Inject; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import static com.facebook.airlift.concurrent.Threads.threadsNamed; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; +import static com.facebook.presto.spi.StandardErrorCode.NO_CPP_SIDECARS; +import static com.facebook.presto.spi.StandardErrorCode.TOO_MANY_SIDECARS; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.immediateFuture; import static java.lang.String.format; @@ -53,10 +58,13 @@ public class ClusterSizeMonitor private final int coordinatorMinCount; private final int coordinatorMinCountActive; private final Duration coordinatorMaxWait; + + private final Duration coordinatorSidecarMaxWait; private final int resourceManagerMinCountActive; private final ScheduledExecutorService executor; private final Consumer listener = this::updateAllNodes; + private final boolean isCoordinatorSidecarEnabled; @GuardedBy("this") private int currentWorkerCount; @@ -67,14 +75,20 @@ public class ClusterSizeMonitor @GuardedBy("this") private int currentResourceManagerCount; + @GuardedBy("this") + private int currentCoordinatorSidecarCount; + @GuardedBy("this") private final List> workerSizeFutures = new ArrayList<>(); @GuardedBy("this") private final List> coordinatorSizeFutures = new ArrayList<>(); + @GuardedBy("this") + private final List> coordinatorSidecarSizeFutures = new ArrayList<>(); + @Inject - public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, QueryManagerConfig queryManagerConfig, NodeResourceStatusConfig nodeResourceStatusConfig) + public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig nodeSchedulerConfig, QueryManagerConfig queryManagerConfig, NodeResourceStatusConfig nodeResourceStatusConfig, ServerConfig serverConfig) { this( nodeManager, @@ -85,7 +99,9 @@ public ClusterSizeMonitor(InternalNodeManager nodeManager, NodeSchedulerConfig n queryManagerConfig.getRequiredCoordinators(), nodeResourceStatusConfig.getRequiredCoordinatorsActive(), queryManagerConfig.getRequiredCoordinatorsMaxWait(), - nodeResourceStatusConfig.getRequiredResourceManagersActive()); + queryManagerConfig.getRequiredCoordinatorSidecarsMaxWait(), + nodeResourceStatusConfig.getRequiredResourceManagersActive(), + serverConfig.isCoordinatorSidecarEnabled()); } public ClusterSizeMonitor( @@ -97,7 +113,9 @@ public ClusterSizeMonitor( int coordinatorMinCount, int coordinatorMinCountActive, Duration coordinatorMaxWait, - int resourceManagerMinCountActive) + Duration coordinatorSidecarMaxWait, + int resourceManagerMinCountActive, + boolean isCoordinatorSidecarEnabled) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.includeCoordinator = includeCoordinator; @@ -111,9 +129,11 @@ public ClusterSizeMonitor( checkArgument(coordinatorMinCountActive >= 0, "coordinatorMinCountActive is negative"); this.coordinatorMinCountActive = coordinatorMinCountActive; this.coordinatorMaxWait = requireNonNull(coordinatorMaxWait, "coordinatorMaxWait is null"); + this.coordinatorSidecarMaxWait = requireNonNull(coordinatorSidecarMaxWait, "coordinatorSidecarMaxWait is null"); checkArgument(resourceManagerMinCountActive >= 0, "resourceManagerMinCountActive is negative"); this.resourceManagerMinCountActive = resourceManagerMinCountActive; this.executor = newSingleThreadScheduledExecutor(threadsNamed("node-monitor-%s")); + this.isCoordinatorSidecarEnabled = isCoordinatorSidecarEnabled; } @PostConstruct @@ -140,7 +160,6 @@ public boolean hasRequiredWorkers() } /** - * * @return true when the current resource manager count is greater or equals to * minimum resource manager count for Coordinator. */ @@ -150,7 +169,6 @@ public boolean hasRequiredResourceManagers() } /** - * * @return true when the current coordinator count in a cluster is greater or equals to * minimum coordinator count for a given Coordinator. */ @@ -159,6 +177,19 @@ public boolean hasRequiredCoordinators() return currentCoordinatorCount >= coordinatorMinCountActive; } + /** + * @return true when the current coordinator sidecars count in a cluster is greater or equals to + * minimum coordinator sidecars count for a given Coordinator. + */ + public boolean hasRequiredCoordinatorSidecars() + { + if (currentCoordinatorSidecarCount > 1) { + throw new PrestoException(TOO_MANY_SIDECARS, + format("Expected a single active coordinator sidecar. Found %s active coordinator sidecars", currentCoordinatorSidecarCount)); + } + return currentCoordinatorSidecarCount == 1; + } + /** * Returns a listener that completes when the minimum number of workers for the cluster has been met. * Note: caller should not add a listener using the direct executor, as this can delay the @@ -224,6 +255,36 @@ public synchronized ListenableFuture waitForMinimumCoordinators() return future; } + public synchronized ListenableFuture waitForMinimumCoordinatorSidecars() + { + if (currentCoordinatorSidecarCount == 1 || !isCoordinatorSidecarEnabled) { + return immediateFuture(null); + } + + SettableFuture future = SettableFuture.create(); + coordinatorSidecarSizeFutures.add(future); + + // if future does not finish in wait period, complete with an exception + ScheduledFuture timeoutTask = executor.schedule( + () -> { + synchronized (this) { + future.setException(new PrestoException( + NO_CPP_SIDECARS, + format("Insufficient active coordinator sidecar nodes. Waited %s for at least 1 coordinator sidecars, but only 0 coordinator sidecars are active", coordinatorSidecarMaxWait))); + } + }, + coordinatorSidecarMaxWait.toMillis(), + MILLISECONDS); + + // remove future if finished (e.g., canceled, timed out) + future.addListener(() -> { + timeoutTask.cancel(true); + removeCoordinatorSidecarFuture(future); + }, executor); + + return future; + } + private synchronized void removeWorkerFuture(SettableFuture future) { workerSizeFutures.remove(future); @@ -234,20 +295,26 @@ private synchronized void removeCoordinatorFuture(SettableFuture future) coordinatorSizeFutures.remove(future); } + private synchronized void removeCoordinatorSidecarFuture(SettableFuture future) + { + coordinatorSidecarSizeFutures.remove(future); + } + private synchronized void updateAllNodes(AllNodes allNodes) { if (includeCoordinator) { currentWorkerCount = allNodes.getActiveNodes().size(); } else { - currentWorkerCount = Sets.difference( - Sets.difference( - allNodes.getActiveNodes(), - allNodes.getActiveCoordinators()), - allNodes.getActiveResourceManagers()).size(); + Set activeNodes = new HashSet<>(allNodes.getActiveNodes()); + activeNodes.removeAll(allNodes.getActiveCoordinators()); + activeNodes.removeAll(allNodes.getActiveResourceManagers()); + activeNodes.removeAll(allNodes.getActiveCoordinatorSidecars()); + currentWorkerCount = activeNodes.size(); } currentCoordinatorCount = allNodes.getActiveCoordinators().size(); currentResourceManagerCount = allNodes.getActiveResourceManagers().size(); + currentCoordinatorSidecarCount = allNodes.getActiveCoordinatorSidecars().size(); if (currentWorkerCount >= workerMinCount) { List> listeners = ImmutableList.copyOf(workerSizeFutures); workerSizeFutures.clear(); @@ -258,5 +325,10 @@ private synchronized void updateAllNodes(AllNodes allNodes) coordinatorSizeFutures.clear(); executor.submit(() -> listeners.forEach(listener -> listener.set(null))); } + if (currentCoordinatorSidecarCount == 1) { + List> listeners = ImmutableList.copyOf(coordinatorSidecarSizeFutures); + coordinatorSidecarSizeFutures.clear(); + executor.submit(() -> listeners.forEach(listener -> listener.set(null))); + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index dfd28ff6909cf..4a60332718bb6 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -86,6 +86,7 @@ public class QueryManagerConfig private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES); private int requiredCoordinators = 1; private Duration requiredCoordinatorsMaxWait = new Duration(5, TimeUnit.MINUTES); + private Duration requiredCoordinatorSidecarsMaxWait = new Duration(5, TimeUnit.MINUTES); private int requiredResourceManagers = 1; private int querySubmissionMaxThreads = Runtime.getRuntime().availableProcessors() * 2; @@ -604,6 +605,21 @@ public QueryManagerConfig setRequiredCoordinatorsMaxWait(Duration requiredCoordi return this; } + @NotNull + public Duration getRequiredCoordinatorSidecarsMaxWait() + { + return requiredCoordinatorSidecarsMaxWait; + } + + @Experimental + @Config("query-manager.experimental.required-coordinator-sidecars-max-wait") + @ConfigDescription("Maximum time to wait for minimum number of coordinator sidecars before the query is failed") + public QueryManagerConfig setRequiredCoordinatorSidecarsMaxWait(Duration requiredCoordinatorSidecarsMaxWait) + { + this.requiredCoordinatorSidecarsMaxWait = requiredCoordinatorSidecarsMaxWait; + return this; + } + @Min(1) public int getQuerySubmissionMaxThreads() { diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java b/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java index c2e4d6ba53bcf..4bf2ff182387b 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/AllNodes.java @@ -29,6 +29,7 @@ public class AllNodes private final Set activeCoordinators; private final Set activeResourceManagers; private final Set activeCatalogServers; + private final Set activeCoordinatorSidecars; private final int activeWorkerCount; public AllNodes( @@ -37,7 +38,8 @@ public AllNodes( Set shuttingDownNodes, Set activeCoordinators, Set activeResourceManagers, - Set activeCatalogServers) + Set activeCatalogServers, + Set activeCoordinatorSidecars) { this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null")); this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null")); @@ -45,6 +47,7 @@ public AllNodes( this.activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null")); this.activeResourceManagers = ImmutableSet.copyOf(requireNonNull(activeResourceManagers, "activeResourceManagers is null")); this.activeCatalogServers = ImmutableSet.copyOf(requireNonNull(activeCatalogServers, "activeCatalogServers is null")); + this.activeCoordinatorSidecars = ImmutableSet.copyOf(requireNonNull(activeCoordinatorSidecars, "activeCoordinatorSidecars is null")); this.activeWorkerCount = Sets.difference(Sets.difference(activeNodes, activeResourceManagers), activeCatalogServers).size(); } @@ -84,6 +87,11 @@ public Set getActiveCatalogServers() return activeCatalogServers; } + public Set getActiveCoordinatorSidecars() + { + return activeCoordinatorSidecars; + } + @Override public boolean equals(Object o) { @@ -99,12 +107,13 @@ public boolean equals(Object o) Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) && Objects.equals(activeCoordinators, allNodes.activeCoordinators) && Objects.equals(activeResourceManagers, allNodes.activeResourceManagers) && - Objects.equals(activeCatalogServers, allNodes.activeCatalogServers); + Objects.equals(activeCatalogServers, allNodes.activeCatalogServers) && + Objects.equals(activeCoordinatorSidecars, allNodes.activeCoordinatorSidecars); } @Override public int hashCode() { - return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators, activeResourceManagers, activeCatalogServers); + return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators, activeResourceManagers, activeCatalogServers, activeCoordinatorSidecars); } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java index cae0cc98e0ed8..f27476089370f 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/DiscoveryNodeManager.java @@ -127,6 +127,9 @@ public final class DiscoveryNodeManager @GuardedBy("this") private Set catalogServers; + @GuardedBy("this") + private Set coordinatorSidecar; + @GuardedBy("this") private final List> listeners = new ArrayList<>(); @@ -179,6 +182,7 @@ private static InternalNode findCurrentNode(List allServices, isCoordinator(service), isResourceManager(service), isCatalogServer(service), + isCoordinatorSidecar(service), ALIVE, raftPort, poolType); @@ -290,6 +294,7 @@ private synchronized void refreshNodesInternal() ImmutableSet.Builder coordinatorsBuilder = ImmutableSet.builder(); ImmutableSet.Builder resourceManagersBuilder = ImmutableSet.builder(); ImmutableSet.Builder catalogServersBuilder = ImmutableSet.builder(); + ImmutableSet.Builder coordinatorSidecarBuilder = ImmutableSet.builder(); ImmutableSetMultimap.Builder byConnectorIdBuilder = ImmutableSetMultimap.builder(); Map nodes = new HashMap<>(); SetMultimap connectorIdsByNodeId = HashMultimap.create(); @@ -313,11 +318,11 @@ private synchronized void refreshNodesInternal() boolean coordinator = isCoordinator(service); boolean resourceManager = isResourceManager(service); boolean catalogServer = isCatalogServer(service); + boolean coordinatorSidecar = isCoordinatorSidecar(service); OptionalInt raftPort = getRaftPort(service); if (uri != null && nodeVersion != null) { - InternalNode node = new InternalNode(service.getNodeId(), uri, thriftPort, nodeVersion, coordinator, resourceManager, catalogServer, ALIVE, raftPort, getPoolType(service)); + InternalNode node = new InternalNode(service.getNodeId(), uri, thriftPort, nodeVersion, coordinator, resourceManager, catalogServer, coordinatorSidecar, ALIVE, raftPort, getPoolType(service)); NodeState nodeState = getNodeState(node); - switch (nodeState) { case ACTIVE: activeNodesBuilder.add(node); @@ -330,6 +335,9 @@ private synchronized void refreshNodesInternal() if (catalogServer) { catalogServersBuilder.add(node); } + if (coordinatorSidecar) { + coordinatorSidecarBuilder.add(node); + } nodes.put(node.getNodeIdentifier(), node); @@ -382,7 +390,7 @@ private synchronized void refreshNodesInternal() InternalNode deadNode = nodes.get(nodeId); Set deadNodeConnectorIds = connectorIdsByNodeId.get(nodeId); for (ConnectorId id : deadNodeConnectorIds) { - byConnectorIdBuilder.put(id, new InternalNode(deadNode.getNodeIdentifier(), deadNode.getInternalUri(), deadNode.getThriftPort(), deadNode.getNodeVersion(), deadNode.isCoordinator(), deadNode.isResourceManager(), deadNode.isCatalogServer(), DEAD, deadNode.getRaftPort(), deadNode.getPoolType())); + byConnectorIdBuilder.put(id, new InternalNode(deadNode.getNodeIdentifier(), deadNode.getInternalUri(), deadNode.getThriftPort(), deadNode.getNodeVersion(), deadNode.isCoordinator(), deadNode.isResourceManager(), deadNode.isCatalogServer(), deadNode.isCoordinatorSidecar(), DEAD, deadNode.getRaftPort(), deadNode.getPoolType())); } } } @@ -396,7 +404,8 @@ private synchronized void refreshNodesInternal() shuttingDownNodesBuilder.build(), coordinatorsBuilder.build(), resourceManagersBuilder.build(), - catalogServersBuilder.build()); + catalogServersBuilder.build(), + coordinatorSidecarBuilder.build()); // only update if all nodes actually changed (note: this does not include the connectors registered with the nodes) if (!allNodes.equals(this.allNodes)) { // assign allNodes to a local variable for use in the callback below @@ -404,6 +413,7 @@ private synchronized void refreshNodesInternal() coordinators = coordinatorsBuilder.build(); resourceManagers = resourceManagersBuilder.build(); catalogServers = catalogServersBuilder.build(); + coordinatorSidecar = coordinatorSidecarBuilder.build(); // notify listeners List> listeners = ImmutableList.copyOf(this.listeners); @@ -526,6 +536,12 @@ public synchronized Set getCatalogServers() return catalogServers; } + @Override + public synchronized Set getCoordinatorSidecars() + { + return coordinatorSidecar; + } + @Override public synchronized void addNodeChangeListener(Consumer listener) { @@ -601,6 +617,11 @@ private static boolean isCatalogServer(ServiceDescriptor service) return Boolean.parseBoolean(service.getProperties().get("catalog_server")); } + private static boolean isCoordinatorSidecar(ServiceDescriptor service) + { + return Boolean.parseBoolean(service.getProperties().get("sidecar")); + } + /** * The predicate filters out the services to allow selecting relevant nodes * for discovery and sending heart beat. @@ -613,14 +634,15 @@ private static boolean isCatalogServer(ServiceDescriptor service) */ private Predicate filterRelevantNodes() { - if (currentNode.isCoordinator() || currentNode.isResourceManager() || currentNode.isCatalogServer()) { + if (currentNode.isCoordinator() || currentNode.isResourceManager() || currentNode.isCatalogServer() || currentNode.isCoordinatorSidecar()) { // Allowing coordinator node in the list of services, even if it's not allowed by nodeStatusService with currentNode check return service -> !nodeStatusService.isPresent() || nodeStatusService.get().isAllowed(service.getLocation()) - || isCatalogServer(service); + || isCatalogServer(service) + || isCoordinatorSidecar(service); } - return service -> isResourceManager(service) || isCatalogServer(service); + return service -> isResourceManager(service) || isCatalogServer(service) || isCoordinatorSidecar(service); } } diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/InMemoryNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/InMemoryNodeManager.java index 053b6cae3001f..d99d860f59aa8 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/InMemoryNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/InMemoryNodeManager.java @@ -122,7 +122,8 @@ public AllNodes getAllNodes() shuttingDownNodesBuilder.build(), concat(Stream.of(localNode), remoteNodes.values().stream()).collect(toImmutableSet()).stream().filter(InternalNode::isCoordinator).collect(toImmutableSet()), concat(Stream.of(localNode), remoteNodes.values().stream()).collect(toImmutableSet()).stream().filter(InternalNode::isResourceManager).collect(toImmutableSet()), - concat(Stream.of(localNode), remoteNodes.values().stream()).collect(toImmutableSet()).stream().filter(InternalNode::isCatalogServer).collect(toImmutableSet())); + concat(Stream.of(localNode), remoteNodes.values().stream()).collect(toImmutableSet()).stream().filter(InternalNode::isCatalogServer).collect(toImmutableSet()), + concat(Stream.of(localNode), remoteNodes.values().stream()).collect(toImmutableSet()).stream().filter(InternalNode::isCoordinatorSidecar).collect(toImmutableSet())); } @Override @@ -156,6 +157,12 @@ public Set getCatalogServers() return getAllNodes().getActiveCatalogServers(); } + @Override + public Set getCoordinatorSidecars() + { + return getAllNodes().getActiveCoordinatorSidecars(); + } + @Override public void refreshNodes() { diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/InternalNode.java b/presto-main/src/main/java/com/facebook/presto/metadata/InternalNode.java index 9b4dcb4510a61..29593eb2baa66 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/InternalNode.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/InternalNode.java @@ -68,24 +68,25 @@ public int getStatusCode() private final boolean coordinator; private final boolean resourceManager; private final boolean catalogServer; + private final boolean coordinatorSidecar; private final NodeStatus nodeStatus; private final OptionalInt raftPort; private final NodePoolType poolType; public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator) { - this(nodeIdentifier, internalUri, nodeVersion, coordinator, false, false); + this(nodeIdentifier, internalUri, nodeVersion, coordinator, false, false, false); } - public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator, boolean resourceManager, boolean catalogServer) + public InternalNode(String nodeIdentifier, URI internalUri, NodeVersion nodeVersion, boolean coordinator, boolean resourceManager, boolean catalogServer, boolean coordinatorSidecar) { - this(nodeIdentifier, internalUri, OptionalInt.empty(), nodeVersion, coordinator, resourceManager, catalogServer, ALIVE, OptionalInt.empty(), DEFAULT); + this(nodeIdentifier, internalUri, OptionalInt.empty(), nodeVersion, coordinator, resourceManager, catalogServer, coordinatorSidecar, ALIVE, OptionalInt.empty(), DEFAULT); } @ThriftConstructor - public InternalNode(String nodeIdentifier, URI internalUri, OptionalInt thriftPort, String nodeVersion, boolean coordinator, boolean resourceManager, boolean catalogServer) + public InternalNode(String nodeIdentifier, URI internalUri, OptionalInt thriftPort, String nodeVersion, boolean coordinator, boolean resourceManager, boolean catalogServer, boolean coordinatorSidecar) { - this(nodeIdentifier, internalUri, thriftPort, new NodeVersion(nodeVersion), coordinator, resourceManager, catalogServer, ALIVE, OptionalInt.empty(), DEFAULT); + this(nodeIdentifier, internalUri, thriftPort, new NodeVersion(nodeVersion), coordinator, resourceManager, catalogServer, coordinatorSidecar, ALIVE, OptionalInt.empty(), DEFAULT); } public InternalNode( @@ -96,6 +97,7 @@ public InternalNode( boolean coordinator, boolean resourceManager, boolean catalogServer, + boolean coordinatorSidecar, NodeStatus nodeStatus, OptionalInt raftPort, NodePoolType poolType) @@ -108,6 +110,7 @@ public InternalNode( this.coordinator = coordinator; this.resourceManager = resourceManager; this.catalogServer = catalogServer; + this.coordinatorSidecar = coordinatorSidecar; this.nodeStatus = nodeStatus; this.raftPort = requireNonNull(raftPort, "raftPort is null"); this.poolType = requireNonNull(poolType, "poolType is null"); @@ -190,6 +193,13 @@ public boolean isCatalogServer() return catalogServer; } + @ThriftField(9) + @Override + public boolean isCoordinatorSidecar() + { + return coordinatorSidecar; + } + public OptionalInt getRaftPort() { return raftPort; @@ -231,6 +241,7 @@ public String toString() .add("coordinator", coordinator) .add("resourceManager", resourceManager) .add("catalogServer", catalogServer) + .add("coordinatorSidecar", coordinatorSidecar) .add("raftPort", raftPort) .add("poolType", poolType) .toString(); diff --git a/presto-main/src/main/java/com/facebook/presto/metadata/InternalNodeManager.java b/presto-main/src/main/java/com/facebook/presto/metadata/InternalNodeManager.java index c74e0193f47a3..81fa60c8165d7 100644 --- a/presto-main/src/main/java/com/facebook/presto/metadata/InternalNodeManager.java +++ b/presto-main/src/main/java/com/facebook/presto/metadata/InternalNodeManager.java @@ -37,6 +37,8 @@ public interface InternalNodeManager Set getCatalogServers(); + Set getCoordinatorSidecars(); + AllNodes getAllNodes(); void refreshNodes(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java b/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java index ac76a36bebc01..30e9f0b506bce 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerConfig.java @@ -29,6 +29,8 @@ public class ServerConfig private boolean resourceManager; private boolean resourceManagerEnabled; private boolean catalogServer; + private boolean coordinatorSidecar; + private boolean coordinatorSidecarEnabled; private boolean catalogServerEnabled; private boolean coordinator = true; private String prestoVersion = getClass().getPackage().getImplementationVersion(); @@ -90,6 +92,18 @@ public ServerConfig setCatalogServerEnabled(boolean catalogServerEnabled) return this; } + public boolean isCoordinatorSidecarEnabled() + { + return coordinatorSidecarEnabled; + } + + @Config("coordinator-sidecar-enabled") + public ServerConfig setCoordinatorSidecarEnabled(boolean coordinatorSidecarEnabled) + { + this.coordinatorSidecarEnabled = coordinatorSidecarEnabled; + return this; + } + public boolean isCoordinator() { return coordinator; diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index 24993f8006d9e..0b10eceff0890 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -167,6 +167,7 @@ public class TestingPrestoServer private final RequestBlocker requestBlocker; private final boolean resourceManager; private final boolean catalogServer; + private final boolean coordinatorSidecar; private final boolean coordinator; private final boolean nodeSchedulerIncludeCoordinator; private final ServerInfoResource serverInfoResource; @@ -238,6 +239,8 @@ public TestingPrestoServer( false, false, false, + false, + false, coordinator, properties, environment, @@ -252,6 +255,8 @@ public TestingPrestoServer( boolean resourceManagerEnabled, boolean catalogServer, boolean catalogServerEnabled, + boolean coordinatorSidecar, + boolean coordinatorSidecarEnabled, boolean coordinator, Map properties, String environment, @@ -263,6 +268,7 @@ public TestingPrestoServer( { this.resourceManager = resourceManager; this.catalogServer = catalogServer; + this.coordinatorSidecar = coordinatorSidecar; this.coordinator = coordinator; this.dataDirectory = dataDirectory.orElseGet(TestingPrestoServer::tempDirectory); @@ -275,7 +281,7 @@ public TestingPrestoServer( coordinatorPort = "0"; } - Map serverProperties = getServerProperties(resourceManagerEnabled, catalogServerEnabled, properties, environment, discoveryUri); + Map serverProperties = getServerProperties(resourceManagerEnabled, catalogServerEnabled, coordinatorSidecarEnabled, properties, environment, discoveryUri); ImmutableList.Builder modules = ImmutableList.builder() .add(new TestingNodeModule(Optional.ofNullable(environment))) @@ -380,6 +386,17 @@ else if (resourceManager) { eventListenerManager = ((TestingEventListenerManager) injector.getInstance(EventListenerManager.class)); clusterStateProvider = injector.getInstance(ResourceManagerClusterStateProvider.class); } + else if (coordinatorSidecar) { + dispatchManager = null; + queryManager = null; + resourceGroupManager = Optional.empty(); + nodePartitioningManager = null; + planOptimizerManager = null; + clusterMemoryManager = null; + statsCalculator = null; + eventListenerManager = null; + clusterStateProvider = null; + } else if (catalogServer) { dispatchManager = null; queryManager = null; @@ -425,6 +442,7 @@ else if (catalogServer) { private Map getServerProperties( boolean resourceManagerEnabled, boolean catalogServerEnabled, + boolean coordinatorSidecarEnabled, Map properties, String environment, URI discoveryUri) @@ -435,13 +453,14 @@ private Map getServerProperties( serverProperties.put("resource-manager-enabled", String.valueOf(resourceManagerEnabled)); serverProperties.put("catalog-server", String.valueOf(catalogServer)); serverProperties.put("catalog-server-enabled", String.valueOf(catalogServerEnabled)); + serverProperties.put("coordinator-sidecar-enabled", String.valueOf(coordinatorSidecarEnabled)); serverProperties.put("presto.version", "testversion"); serverProperties.put("task.concurrency", "4"); serverProperties.put("task.max-worker-threads", "4"); serverProperties.put("exchange.client-threads", "4"); serverProperties.put("optimizer.ignore-stats-calculator-failures", "false"); serverProperties.put("internal-communication.shared-secret", "internal-shared-secret"); - if (coordinator || resourceManager || catalogServer) { + if (coordinator || resourceManager || catalogServer || coordinatorSidecar) { // enabling failure detector in tests can make them flakey serverProperties.put("failure-detector.enabled", "false"); } diff --git a/presto-main/src/test/java/com/facebook/presto/catalogserver/TestRandomCatalogServerAddressSelector.java b/presto-main/src/test/java/com/facebook/presto/catalogserver/TestRandomCatalogServerAddressSelector.java index ebd7b1181cd00..6a038b38eec14 100644 --- a/presto-main/src/test/java/com/facebook/presto/catalogserver/TestRandomCatalogServerAddressSelector.java +++ b/presto-main/src/test/java/com/facebook/presto/catalogserver/TestRandomCatalogServerAddressSelector.java @@ -67,7 +67,8 @@ public void testAddressSelectionNoContext() "1", false, false, - true)); + true, + false)); internalNodeManager.addNode( CONNECTOR_ID, new InternalNode( @@ -77,7 +78,8 @@ public void testAddressSelectionNoContext() "1", false, false, - true)); + true, + false)); internalNodeManager.addNode( CONNECTOR_ID, new InternalNode( @@ -87,7 +89,8 @@ public void testAddressSelectionNoContext() "1", false, false, - true)); + true, + false)); Optional address = selector.selectAddress(Optional.empty()); assertTrue(address.isPresent()); diff --git a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java index 9b8c2ab9b1560..752d8947886fb 100644 --- a/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java +++ b/presto-main/src/test/java/com/facebook/presto/dispatcher/TestLocalDispatchQuery.java @@ -440,7 +440,7 @@ public void testQueryDispatched() private ClusterSizeMonitor createClusterSizeMonitor(int minimumNodes) { - return new ClusterSizeMonitor(new InMemoryNodeManager(), true, minimumNodes, minimumNodes, new Duration(10, MILLISECONDS), 1, 1, new Duration(1, SECONDS), 0); + return new ClusterSizeMonitor(new InMemoryNodeManager(), true, minimumNodes, minimumNodes, new Duration(10, MILLISECONDS), 1, 1, new Duration(1, SECONDS), new Duration(1, SECONDS), 0, false); } private QueryMonitor createQueryMonitor(CountingEventListener eventListener) diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestClusterSizeMonitor.java b/presto-main/src/test/java/com/facebook/presto/execution/TestClusterSizeMonitor.java index 19d8b4a62376b..de0ce3e05fd14 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestClusterSizeMonitor.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestClusterSizeMonitor.java @@ -17,6 +17,7 @@ import com.facebook.presto.metadata.InMemoryNodeManager; import com.facebook.presto.metadata.InternalNode; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.PrestoException; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.units.Duration; import org.testng.annotations.AfterMethod; @@ -44,14 +45,18 @@ public class TestClusterSizeMonitor public static final int DESIRED_COORDINATOR_COUNT_ACTIVE = 2; public static final int DESIRED_RESOURCE_MANAGER_COUNT_ACTIVE = 1; public static final int DESIRED_WORKER_COUNT_ACTIVE = 10; + public static final int DESIRED_COORDINATOR_SIDECAR_COUNT = 1; private InMemoryNodeManager nodeManager; private ClusterSizeMonitor monitor; private CountDownLatch minWorkersLatch; + private CountDownLatch minCoordinatorSidecarsLatch; private AtomicInteger numWorkers; private AtomicInteger numCoordinators; private AtomicInteger numResourceManagers; + private AtomicInteger numCoordinatorSidecars; private AtomicBoolean workersTimeout; + private AtomicBoolean coordinatorSidecarsTimeout; @BeforeMethod public void setUp() @@ -59,7 +64,10 @@ public void setUp() numWorkers = new AtomicInteger(0); numCoordinators = new AtomicInteger(0); numResourceManagers = new AtomicInteger(0); + numCoordinatorSidecars = new AtomicInteger(0); + workersTimeout = new AtomicBoolean(); + coordinatorSidecarsTimeout = new AtomicBoolean(); nodeManager = new InMemoryNodeManager(); monitor = new ClusterSizeMonitor( @@ -71,9 +79,12 @@ public void setUp() DESIRED_COORDINATOR_COUNT, DESIRED_COORDINATOR_COUNT_ACTIVE, new Duration(4, SECONDS), - DESIRED_RESOURCE_MANAGER_COUNT_ACTIVE); + new Duration(4, SECONDS), + DESIRED_RESOURCE_MANAGER_COUNT_ACTIVE, + true); minWorkersLatch = new CountDownLatch(1); + minCoordinatorSidecarsLatch = new CountDownLatch(1); monitor.start(); } @@ -121,6 +132,55 @@ public void testTimeoutWaitingForWorkers() assertEquals(minWorkersLatch.getCount(), 0); } + @Test(timeOut = 60_000) + public void testWaitForMinimumCoordinatorSidecars() + throws InterruptedException + { + ListenableFuture coordinatorSidecarsFuture = waitForMinimumCoordinatorSidecars(); + assertFalse(monitor.hasRequiredCoordinatorSidecars()); + assertFalse(coordinatorSidecarsTimeout.get()); + assertEquals(minCoordinatorSidecarsLatch.getCount(), 1); + addCoordinatorSidecar(nodeManager); + minCoordinatorSidecarsLatch.await(1, SECONDS); + assertTrue(coordinatorSidecarsFuture.isDone()); + assertFalse(coordinatorSidecarsTimeout.get()); + assertTrue(monitor.hasRequiredCoordinatorSidecars()); + } + + @Test(timeOut = 10_000) + public void testTimeoutWaitingForCoordinatorSidecars() + throws InterruptedException + { + waitForMinimumCoordinatorSidecars(); + assertFalse(coordinatorSidecarsTimeout.get()); + assertEquals(minCoordinatorSidecarsLatch.getCount(), 1); + Thread.sleep(SECONDS.toMillis(5)); + assertTrue(coordinatorSidecarsTimeout.get()); + assertEquals(minCoordinatorSidecarsLatch.getCount(), 0); + } + + @Test + public void testHasRequiredCoordinatorSidecars() + throws InterruptedException + { + assertFalse(monitor.hasRequiredCoordinatorSidecars()); + for (int i = numCoordinatorSidecars.get(); i < DESIRED_COORDINATOR_SIDECAR_COUNT; i++) { + addCoordinatorSidecar(nodeManager); + } + assertTrue(monitor.hasRequiredCoordinatorSidecars()); + } + + @Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = "Expected a single active coordinator sidecar. Found 2 active coordinator sidecars") + public void testHasRequiredCoordinatorSidecarsMoreThanOne() + throws InterruptedException + { + assertFalse(monitor.hasRequiredCoordinatorSidecars()); + for (int i = numCoordinatorSidecars.get(); i < DESIRED_COORDINATOR_SIDECAR_COUNT + 1; i++) { + addCoordinatorSidecar(nodeManager); + } + assertFalse(monitor.hasRequiredCoordinatorSidecars()); + } + @Test public void testHasRequiredResourceManagers() throws InterruptedException @@ -157,6 +217,20 @@ private ListenableFuture waitForMinimumWorkers() return workersFuture; } + private ListenableFuture waitForMinimumCoordinatorSidecars() + { + ListenableFuture coordinatorSidecarsFuture = monitor.waitForMinimumCoordinatorSidecars(); + addSuccessCallback(coordinatorSidecarsFuture, () -> { + assertFalse(coordinatorSidecarsTimeout.get()); + minCoordinatorSidecarsLatch.countDown(); + }); + addExceptionCallback(coordinatorSidecarsFuture, () -> { + assertTrue(coordinatorSidecarsTimeout.compareAndSet(false, true)); + minCoordinatorSidecarsLatch.countDown(); + }); + return coordinatorSidecarsFuture; + } + private void addWorker(InMemoryNodeManager nodeManager) { String identifier = "worker/" + numWorkers.incrementAndGet(); @@ -180,6 +254,22 @@ private void addResourceManager(InMemoryNodeManager nodeManager) new NodeVersion("1"), false, true, + false, false)); } + + private void addCoordinatorSidecar(InMemoryNodeManager nodeManager) + { + String identifier = "coordinator_sidecar/" + numCoordinatorSidecars.incrementAndGet(); + nodeManager.addNode( + CONNECTOR_ID, + new InternalNode( + identifier, + URI.create("localhost/" + identifier), + new NodeVersion("1"), + false, + false, + false, + true)); + } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index 622d5f2c64b3f..4ef563ad42406 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -72,6 +72,7 @@ public void testDefaults() .setRequiredWorkersMaxWait(new Duration(5, TimeUnit.MINUTES)) .setRequiredCoordinators(1) .setRequiredCoordinatorsMaxWait(new Duration(5, TimeUnit.MINUTES)) + .setRequiredCoordinatorSidecarsMaxWait(new Duration(5, TimeUnit.MINUTES)) .setRequiredResourceManagers(1) .setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2) .setUseStreamingExchangeForMarkDistinct(false) @@ -124,6 +125,7 @@ public void testExplicitPropertyMappings() .put("query-manager.required-workers-max-wait", "33m") .put("query-manager.experimental.required-coordinators", "999") .put("query-manager.experimental.required-coordinators-max-wait", "99m") + .put("query-manager.experimental.required-coordinator-sidecars-max-wait", "99m") .put("query-manager.experimental.required-resource-managers", "9") .put("query-manager.experimental.query-submission-max-threads", "5") .put("per-query-retry-limit", "10") @@ -174,6 +176,7 @@ public void testExplicitPropertyMappings() .setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES)) .setRequiredCoordinators(999) .setRequiredCoordinatorsMaxWait(new Duration(99, TimeUnit.MINUTES)) + .setRequiredCoordinatorSidecarsMaxWait(new Duration(99, TimeUnit.MINUTES)) .setRequiredResourceManagers(9) .setQuerySubmissionMaxThreads(5) .setUseStreamingExchangeForMarkDistinct(true) diff --git a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java index b4d3babc2d56c..1503f97b74723 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java @@ -1046,6 +1046,7 @@ private InternalNodeManager createNodeManager() "1", false, false, + false, false)); internalNodeManager.addNode( new ConnectorId("dummy"), @@ -1056,6 +1057,7 @@ private InternalNodeManager createNodeManager() "1", false, false, + false, false)); internalNodeManager.addNode( new ConnectorId("dummy"), @@ -1066,6 +1068,7 @@ private InternalNodeManager createNodeManager() "1", false, false, + false, false)); return internalNodeManager; } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPartialResultQueryTaskTracker.java b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPartialResultQueryTaskTracker.java index e20e934642f21..141596c2dc4c6 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPartialResultQueryTaskTracker.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPartialResultQueryTaskTracker.java @@ -72,6 +72,7 @@ public void testPartialResultQueryTaskTracker() new NodeVersion("1"), false, false, + false, false); InternalNode node2 = new InternalNode( UUID.randomUUID().toString(), @@ -79,6 +80,7 @@ public void testPartialResultQueryTaskTracker() new NodeVersion("1"), false, false, + false, false); TaskId taskId1 = new TaskId("test1", 1, 0, 1, 0); TaskId taskId2 = new TaskId("test2", 2, 0, 1, 0); diff --git a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java index d0000f0c7f37d..b2ae47bdba249 100644 --- a/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java +++ b/presto-main/src/test/java/com/facebook/presto/metadata/TestDiscoveryNodeManager.java @@ -58,6 +58,8 @@ public class TestDiscoveryNodeManager private final NodeInfo coordinatorNodeInfo = new NodeInfo("test"); private final NodeInfo resourceManagerNodeInfo = new NodeInfo("test"); private final NodeInfo catalogServerNodeInfo = new NodeInfo("test"); + private final NodeInfo coordinatorSidecarNodeInfo = new NodeInfo("test"); + private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig(); private NodeVersion expectedVersion; private Set activeNodes; @@ -69,6 +71,8 @@ public class TestDiscoveryNodeManager private InternalNode resourceManager; private InternalNode catalogServer; private InternalNode inActiveCatalogServer; + private InternalNode coordinatorSidecar; + private InternalNode inActiveCoordinatorSidecar; private InternalNode workerNode1; private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector(); private HttpClient testHttpClient; @@ -90,6 +94,7 @@ public void setup() expectedVersion, false, true, + false, false); catalogServer = new InternalNode( catalogServerNodeInfo.getNodeId(), @@ -97,6 +102,15 @@ public void setup() expectedVersion, false, false, + true, + false); + coordinatorSidecar = new InternalNode( + coordinatorSidecarNodeInfo.getNodeId(), + URI.create("https://192.0.3.1"), + expectedVersion, + false, + false, + false, true); workerNode1 = new InternalNode(workerNodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false); workerNode2 = new InternalNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false); @@ -107,6 +121,7 @@ public void setup() new NodeVersion("2"), false, true, + false, false); inActiveCatalogServer = new InternalNode( catalogServerNodeInfo.getNodeId(), @@ -114,6 +129,15 @@ public void setup() new NodeVersion("2"), false, false, + true, + false); + inActiveCoordinatorSidecar = new InternalNode( + coordinatorSidecarNodeInfo.getNodeId(), + URI.create("https://192.0.3.2"), + new NodeVersion("2"), + false, + false, + false, true); inActiveCoordinator = new InternalNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.1"), new NodeVersion("2"), true); inActiveWorkerNode1 = new InternalNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false); @@ -125,11 +149,13 @@ public void setup() .add(coordinator) .add(resourceManager) .add(catalogServer) + .add(coordinatorSidecar) .build(); inactiveNodes = ImmutableSet.of( inActiveCoordinator, inActiveResourceManager, inActiveCatalogServer, + inActiveCoordinatorSidecar, inActiveWorkerNode1, inActiveWorkerNode2); @@ -144,7 +170,7 @@ public void testGetAllNodesForWorkerNode() AllNodes allNodes = manager.getAllNodes(); Set activeNodes = allNodes.getActiveNodes(); - assertEqualsIgnoreOrder(activeNodes, ImmutableSet.of(resourceManager, catalogServer)); + assertEqualsIgnoreOrder(activeNodes, ImmutableSet.of(resourceManager, catalogServer, coordinatorSidecar)); for (InternalNode actual : activeNodes) { for (InternalNode expected : this.activeNodes) { @@ -155,7 +181,7 @@ public void testGetAllNodesForWorkerNode() assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE)); Set inactiveNodes = allNodes.getInactiveNodes(); - assertEqualsIgnoreOrder(inactiveNodes, ImmutableSet.of(inActiveResourceManager, inActiveCatalogServer)); + assertEqualsIgnoreOrder(inactiveNodes, ImmutableSet.of(inActiveResourceManager, inActiveCatalogServer, inActiveCoordinatorSidecar)); for (InternalNode actual : inactiveNodes) { for (InternalNode expected : this.inactiveNodes) { @@ -238,6 +264,40 @@ public void testGetAllNodesForResourceManager() } } + @Test + public void testGetAllNodesForCoordinatorSidecar() + { + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorSidecarNodeInfo, new NoOpFailureDetector(), Optional.empty(), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig); + try { + AllNodes allNodes = manager.getAllNodes(); + + Set activeNodes = allNodes.getActiveNodes(); + assertEqualsIgnoreOrder(activeNodes, this.activeNodes); + + for (InternalNode actual : activeNodes) { + for (InternalNode expected : this.activeNodes) { + assertNotSame(actual, expected); + } + } + + assertEqualsIgnoreOrder(activeNodes, manager.getNodes(ACTIVE)); + + Set inactiveNodes = allNodes.getInactiveNodes(); + assertEqualsIgnoreOrder(inactiveNodes, this.inactiveNodes); + + for (InternalNode actual : inactiveNodes) { + for (InternalNode expected : this.inactiveNodes) { + assertNotSame(actual, expected); + } + } + + assertEqualsIgnoreOrder(inactiveNodes, manager.getNodes(INACTIVE)); + } + finally { + manager.stop(); + } + } + @Test public void testGetCurrentNode() { @@ -286,6 +346,18 @@ public void testGetCatalogServers() } } + @Test + public void testGetCoordinatorSidecar() + { + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, coordinatorSidecarNodeInfo, new NoOpFailureDetector(), Optional.of(host -> false), expectedVersion, testHttpClient, new TestingDriftClient<>(), internalCommunicationConfig); + try { + assertEquals(manager.getCoordinatorSidecars(), ImmutableSet.of(coordinatorSidecar)); + } + finally { + manager.stop(); + } + } + @SuppressWarnings("ResultOfObjectAllocationIgnored") @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".* current node not returned .*") public void testGetCurrentNodeRequired() @@ -339,6 +411,7 @@ private synchronized void announceNodes(Set activeNodes, Set address = selector.selectAddress(Optional.empty()); diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java index 8d6360a66fb54..a3381a2a28ed9 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java @@ -78,6 +78,7 @@ public void setup() "1", false, true, + false, false)); sender = new ResourceManagerClusterStatusSender( diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestServerConfig.java b/presto-main/src/test/java/com/facebook/presto/server/TestServerConfig.java index 6656c9692fdf1..56f94a1ec216b 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestServerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestServerConfig.java @@ -45,6 +45,7 @@ public void testDefaults() .setResourceManager(false) .setCatalogServer(false) .setCatalogServerEnabled(false) + .setCoordinatorSidecarEnabled(false) .setPoolType(DEFAULT) .setClusterStatsExpirationDuration(new Duration(0, MILLISECONDS)) .setNestedDataSerializationEnabled(true) @@ -66,6 +67,7 @@ public void testExplicitPropertyMappings() .put("resource-manager", "true") .put("catalog-server-enabled", "true") .put("catalog-server", "true") + .put("coordinator-sidecar-enabled", "true") .put("pool-type", "LEAF") .put("cluster-stats-expiration-duration", "10s") .put("nested-data-serialization-enabled", "false") @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings() .setResourceManager(true) .setCatalogServer(true) .setCatalogServerEnabled(true) + .setCoordinatorSidecarEnabled(true) .setPoolType(LEAF) .setClusterStatsExpirationDuration(new Duration(10, SECONDS)) .setNestedDataSerializationEnabled(false) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkInternalNodeManager.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkInternalNodeManager.java index 960888937c798..1046a820ad367 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkInternalNodeManager.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkInternalNodeManager.java @@ -41,6 +41,7 @@ public class PrestoSparkInternalNodeManager ImmutableSet.of(), ImmutableSet.of(), ImmutableSet.of(), + ImmutableSet.of(), ImmutableSet.of()); @Override @@ -94,6 +95,12 @@ public Set getCatalogServers() throw new UnsupportedOperationException(); } + @Override + public Set getCoordinatorSidecars() + { + throw new UnsupportedOperationException(); + } + @Override public AllNodes getAllNodes() { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/Node.java b/presto-spi/src/main/java/com/facebook/presto/spi/Node.java index 7a592b8f7b5d5..5145c1b71cded 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/Node.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/Node.java @@ -37,5 +37,7 @@ public interface Node boolean isCatalogServer(); + boolean isCoordinatorSidecar(); + NodePoolType getPoolType(); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java index 17169181ca3dc..89b20d935d750 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java @@ -136,6 +136,8 @@ public enum StandardErrorCode MISSING_RESOURCE_GROUP_SELECTOR(0x0002_0010, INTERNAL_ERROR), EXCEEDED_HEAP_MEMORY_LIMIT(0x0002_0011, INSUFFICIENT_RESOURCES), EXCEEDED_WRITTEN_INTERMEDIATE_BYTES_LIMIT(0x0002_0012, INSUFFICIENT_RESOURCES), + TOO_MANY_SIDECARS(0x0002_0013, INTERNAL_ERROR), + NO_CPP_SIDECARS(0x0002_0014, INTERNAL_ERROR), /**/; // Error code range 0x0003 is reserved for Presto-on-Spark diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java index 4db85b08f4684..1dd920c49555f 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java @@ -120,6 +120,8 @@ public class DistributedQueryRunner private final ReadWriteLock lock = new ReentrantReadWriteLock(); private Optional catalogServer = Optional.empty(); private Optional> resourceManagers; + private Optional coordinatorSidecar = Optional.empty(); + private final int resourceManagerCount; private final AtomicReference testFunctionNamespacesHandle = new AtomicReference<>(); @@ -135,6 +137,7 @@ public DistributedQueryRunner(Session defaultSession, int nodeCount, Map coordinatorProperties, Map resourceManagerProperties, Map catalogServerProperties, + Map coordinatorSidecarProperties, SqlParserOptions parserOptions, String environment, Optional dataDirectory, @@ -217,17 +223,19 @@ private DistributedQueryRunner( Map workerProperties = new HashMap<>(extraProperties); workerProperties.put("pool-type", workerPool.name()); TestingPrestoServer worker = closer.register(createTestingPrestoServer( - discoveryUrl, - false, - resourceManagerEnabled, - false, - catalogServerEnabled, - false, - workerProperties, - parserOptions, - environment, - dataDirectory, - extraModules)); + discoveryUrl, + false, + resourceManagerEnabled, + false, + catalogServerEnabled, + false, + coordinatorSidecarEnabled, + false, + workerProperties, + parserOptions, + environment, + dataDirectory, + extraModules)); servers.add(worker); } } @@ -250,6 +258,8 @@ private DistributedQueryRunner( false, false, false, + false, + false, rmProperties, parserOptions, environment, @@ -268,6 +278,8 @@ private DistributedQueryRunner( true, true, false, + false, + false, catalogServerProperties, parserOptions, environment, @@ -276,6 +288,24 @@ private DistributedQueryRunner( servers.add(catalogServer.get()); } + if (coordinatorSidecarEnabled) { + coordinatorSidecar = Optional.of(closer.register(createTestingPrestoServer( + discoveryUrl, + false, + false, + false, + false, + true, + true, + false, + coordinatorSidecarProperties, + parserOptions, + environment, + dataDirectory, + extraModules))); + servers.add(coordinatorSidecar.get()); + } + for (int i = 0; i < coordinatorCount; i++) { TestingPrestoServer coordinator = closer.register(createTestingPrestoServer( discoveryUrl, @@ -283,6 +313,8 @@ private DistributedQueryRunner( resourceManagerEnabled, false, catalogServerEnabled, + false, + false, true, extraCoordinatorProperties, parserOptions, @@ -378,6 +410,8 @@ private static TestingPrestoServer createTestingPrestoServer( boolean resourceManagerEnabled, boolean catalogServer, boolean catalogServerEnabled, + boolean coordinatorSidecar, + boolean coordinatorSidecarEnabled, boolean coordinator, Map extraProperties, SqlParserOptions parserOptions, @@ -406,6 +440,8 @@ private static TestingPrestoServer createTestingPrestoServer( resourceManagerEnabled, catalogServer, catalogServerEnabled, + coordinatorSidecar, + coordinatorSidecarEnabled, coordinator, properties, environment, @@ -424,6 +460,9 @@ else if (resourceManager) { else if (catalogServer) { nodeRole = "catalogServer"; } + else if (coordinatorSidecar) { + nodeRole = "coordinatorSidecar"; + } log.info("Created %s TestingPrestoServer in %s: %s", nodeRole, nanosSince(start).convertToMostSuccinctTimeUnit(), server.getBaseUrl()); return server; @@ -559,6 +598,11 @@ public Optional getCatalogServer() return catalogServer; } + public Optional getCoordinatorSidecar() + { + return coordinatorSidecar; + } + public TestingPrestoServer getResourceManager(int resourceManager) { checkState(resourceManager < resourceManagers.get().size(), format("Expected resource manager index %d < %d", resourceManager, resourceManagerCount)); @@ -872,12 +916,14 @@ public static class Builder private Map coordinatorProperties = ImmutableMap.of(); private Map resourceManagerProperties = ImmutableMap.of(); private Map catalogServerProperties = ImmutableMap.of(); + private Map coordinatorSidecarProperties = ImmutableMap.of(); private SqlParserOptions parserOptions = DEFAULT_SQL_PARSER_OPTIONS; private String environment = ENVIRONMENT; private Optional dataDirectory = Optional.empty(); private Optional> externalWorkerLauncher = Optional.empty(); private boolean resourceManagerEnabled; private boolean catalogServerEnabled; + private boolean coordinatorSidecarEnabled; private List extraModules = ImmutableList.of(); private int resourceManagerCount = 1; @@ -939,6 +985,12 @@ public Builder setCatalogServerProperties(Map catalogServerPrope return this; } + public Builder setCoordinatorSidecarProperties(Map coordinatorSidecarProperties) + { + this.coordinatorSidecarProperties = coordinatorSidecarProperties; + return this; + } + /** * Sets coordinator properties being equal to a map containing given key and value. * Note, that calling this method OVERWRITES previously set property values. @@ -985,6 +1037,12 @@ public Builder setCatalogServerEnabled(boolean catalogServerEnabled) return this; } + public Builder setCoordinatorSidecarEnabled(boolean coordinatorSidecarEnabled) + { + this.coordinatorSidecarEnabled = coordinatorSidecarEnabled; + return this; + } + public Builder setExtraModules(List extraModules) { this.extraModules = extraModules; @@ -1003,6 +1061,7 @@ public DistributedQueryRunner build() return new DistributedQueryRunner( resourceManagerEnabled, catalogServerEnabled, + coordinatorSidecarEnabled, defaultSession, nodeCount, coordinatorCount, @@ -1011,6 +1070,7 @@ public DistributedQueryRunner build() coordinatorProperties, resourceManagerProperties, catalogServerProperties, + coordinatorSidecarProperties, parserOptions, environment, dataDirectory, diff --git a/presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java b/presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java index 78bdedbfae96a..30951ad186cbf 100644 --- a/presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java +++ b/presto-tests/src/test/java/com/facebook/presto/memory/TestMemoryManager.java @@ -446,7 +446,7 @@ public void reservedPoolDisabledMultiCoordinatorSetup() .put("experimental.reserved-pool-enabled", "false") .build(); - queryRunner2 = createQueryRunner(rmProperties, extraProperties, coordinatorProperties, extraProperties, 2); + queryRunner2 = createQueryRunner(rmProperties, extraProperties, extraProperties, coordinatorProperties, extraProperties, 2); } @AfterGroups(groups = {"reservedPoolDisabledMultiCoordinator"}) @@ -503,7 +503,7 @@ public void clusterPoolsMultiCoordinatorSetup() .put("resource-manager.query-heartbeat-interval", "10ms") .put("resource-manager.node-status-timeout", "5s") .build(); - queryRunner2 = createQueryRunner(properties, ImmutableMap.of(), ImmutableMap.of(), properties, 2); + queryRunner2 = createQueryRunner(properties, ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), properties, 2); } @AfterGroups(groups = {"clusterPoolsMultiCoordinator"}) diff --git a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java index d1c34b1bf1ce6..cc09b4a10a54e 100644 --- a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java +++ b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java @@ -75,6 +75,7 @@ public void setup() "resource-manager.query-expiration-timeout", "4m", "resource-manager.completed-query-expiration-timeout", "4m"), ImmutableMap.of(), + ImmutableMap.of(), ImmutableMap.of( "query.client.timeout", "20s", "resource-manager.query-heartbeat-interval", "100ms", diff --git a/presto-tests/src/test/java/com/facebook/presto/server/TestServerInfoResource.java b/presto-tests/src/test/java/com/facebook/presto/server/TestServerInfoResource.java index 00d532fd53a86..298dee315a0e4 100644 --- a/presto-tests/src/test/java/com/facebook/presto/server/TestServerInfoResource.java +++ b/presto-tests/src/test/java/com/facebook/presto/server/TestServerInfoResource.java @@ -85,6 +85,7 @@ public void createQueryRunnerSetup() throws Exception { queryRunner = createQueryRunner( + ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of("cluster.required-resource-managers-active", "1", "cluster.required-coordinators-active", "1"), @@ -105,6 +106,7 @@ public void createQueryRunnerWithNoClusterReadyCheckSetup() throws Exception { queryRunner = createQueryRunnerWithNoClusterReadyCheck( + ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of("cluster.required-resource-managers-active", "2", "cluster.required-coordinators-active", "1"), @@ -125,6 +127,7 @@ public void getServerStateWithoutRequiredCoordinatorsSetup() throws Exception { queryRunner = createQueryRunnerWithNoClusterReadyCheck( + ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of("cluster.required-resource-managers-active", "1", "cluster.required-coordinators-active", "3"), diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/tpch/TpchQueryRunner.java b/presto-tests/src/test/java/com/facebook/presto/tests/tpch/TpchQueryRunner.java index dedff5b4f8037..bd0e530ee53e5 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/tpch/TpchQueryRunner.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/tpch/TpchQueryRunner.java @@ -54,38 +54,41 @@ public static DistributedQueryRunner createQueryRunner(Map extra public static DistributedQueryRunner createQueryRunner( Map resourceManagerProperties, Map catalogServerProperties, + Map coordinatorSidecarProperties, Map coordinatorProperties, Map extraProperties, int coordinatorCount) throws Exception { - return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorProperties, extraProperties, coordinatorCount, false, 1); + return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorSidecarProperties, coordinatorProperties, extraProperties, coordinatorCount, false, 1); } public static DistributedQueryRunner createQueryRunnerWithNoClusterReadyCheck( Map resourceManagerProperties, Map catalogServerProperties, + Map coordinatorSidecarProperties, Map coordinatorProperties, Map extraProperties, int coordinatorCount) throws Exception { - return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorProperties, extraProperties, coordinatorCount, true, 1); + return createQueryRunner(resourceManagerProperties, catalogServerProperties, coordinatorSidecarProperties, coordinatorProperties, extraProperties, coordinatorCount, true, 1); } public static DistributedQueryRunner createQueryRunner(Map resourceManagerProperties, Map coordinatorProperties, Map extraProperties, int coordinatorCount, int resourceManagerCount) throws Exception { - return createQueryRunner(resourceManagerProperties, ImmutableMap.of(), coordinatorProperties, extraProperties, coordinatorCount, false, resourceManagerCount); + return createQueryRunner(resourceManagerProperties, ImmutableMap.of(), ImmutableMap.of(), coordinatorProperties, extraProperties, coordinatorCount, false, resourceManagerCount); } - public static DistributedQueryRunner createQueryRunner(Map resourceManagerProperties, Map catalogServerProperties, Map coordinatorProperties, Map extraProperties, int coordinatorCount, boolean skipClusterReadyCheck, int resourceManagerCount) + public static DistributedQueryRunner createQueryRunner(Map resourceManagerProperties, Map catalogServerProperties, Map coordinatorSidecarProperties, Map coordinatorProperties, Map extraProperties, int coordinatorCount, boolean skipClusterReadyCheck, int resourceManagerCount) throws Exception { DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder() .setResourceManagerProperties(resourceManagerProperties) .setCatalogServerProperties(catalogServerProperties) .setCoordinatorProperties(coordinatorProperties) + .setCoordinatorSidecarProperties(coordinatorSidecarProperties) .setExtraProperties(extraProperties) .setResourceManagerEnabled(true) .setCoordinatorCount(coordinatorCount)