diff --git a/presto-main/src/main/java/com/facebook/presto/execution/ForQueryScheduling.java b/presto-main/src/main/java/com/facebook/presto/execution/ForQueryScheduling.java deleted file mode 100644 index e6cd4cfb4358a..0000000000000 --- a/presto-main/src/main/java/com/facebook/presto/execution/ForQueryScheduling.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.execution; - -import javax.inject.Qualifier; - -import java.lang.annotation.Retention; -import java.lang.annotation.Target; - -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -@Retention(RUNTIME) -@Target({FIELD, PARAMETER, METHOD}) -@Qualifier -public @interface ForQueryScheduling -{ -} diff --git a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java index 0ae5481500b7b..53be31e8f37de 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java @@ -33,7 +33,6 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -61,7 +60,6 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.math.LongMath.saturatedAdd; import static com.google.common.math.LongMath.saturatedMultiply; @@ -90,7 +88,6 @@ public class InternalResourceGroup private final BiConsumer jmxExportListener; private final Executor executor; private final boolean staticResourceGroup; - private final int version; // Configuration // ============= @@ -143,22 +140,15 @@ public class InternalResourceGroup private long lastStartMillis; @GuardedBy("root") private final CounterStat timeBetweenStartsSec = new CounterStat(); - @GuardedBy("root") - private Optional next; - @GuardedBy("root") - private Optional prev; protected InternalResourceGroup( Optional parent, String name, BiConsumer jmxExportListener, Executor executor, - boolean staticResourceGroup, - int version) + boolean staticResourceGroup) { this.parent = requireNonNull(parent, "parent is null"); - this.next = Optional.empty(); - this.prev = Optional.empty(); this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null"); this.executor = requireNonNull(executor, "executor is null"); requireNonNull(name, "name is null"); @@ -171,34 +161,6 @@ protected InternalResourceGroup( root = this; } this.staticResourceGroup = staticResourceGroup; - this.version = version; - } - - protected InternalResourceGroup( - Optional parent, - String name, - BiConsumer jmxExportListener, - Executor executor, - InternalResourceGroup root, - boolean staticResourceGroup, - Integer version) - { - this.parent = requireNonNull(parent, "parent is null"); - this.next = Optional.empty(); - this.prev = Optional.empty(); - this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null"); - this.executor = requireNonNull(executor, "executor is null"); - requireNonNull(name, "name is null"); - if (parent.isPresent()) { - id = new ResourceGroupId(parent.get().id, name); - this.root = root; - } - else { - id = new ResourceGroupId(name); - this.root = root; - } - this.staticResourceGroup = staticResourceGroup; - this.version = version; } public ResourceGroupInfo getResourceGroupInfo(boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly) @@ -214,55 +176,15 @@ public ResourceGroupInfo getResourceGroupInfo(boolean includeQueryInfo, boolean hardConcurrencyLimit, maxQueuedQueries, DataSize.succinctBytes(cachedMemoryUsageBytes), - getAllQueuedQueriesCount(), - getAllRunningQueriesCount(), + getQueuedQueries(), + getRunningQueries(), eligibleSubGroups.size(), subGroups.values().stream() .filter(group -> group.getRunningQueries() + group.getQueuedQueries() > 0) .filter(group -> !includeStaticSubgroupsOnly || group.isStaticResourceGroup()) .map(group -> summarizeSubgroups ? group.getSummaryInfo() : group.getResourceGroupInfo(includeQueryInfo, false, includeStaticSubgroupsOnly)) .collect(toImmutableList()), - includeQueryInfo ? getAggregatedRunningQueriesInfo() : null, - version); - } - } - - public List getAllVersionResourceGroupInfo(boolean includeQueryInfo, boolean summarizeSubgroups, boolean includeStaticSubgroupsOnly) - { - synchronized (root) { - List allVersionResourceGroupinfo = new ArrayList<>(); - InternalResourceGroup group = this; - while (group != null) { - ResourceGroupInfo resourceGroupInfo = new ResourceGroupInfo( - group.id, - group.getState(), - group.schedulingPolicy, - group.schedulingWeight, - DataSize.succinctBytes(group.softMemoryLimitBytes), - group.softConcurrencyLimit, - group.hardConcurrencyLimit, - group.maxQueuedQueries, - DataSize.succinctBytes(group.cachedMemoryUsageBytes), - group.getAllQueuedQueriesCount(), - group.getAllRunningQueriesCount(), - group.eligibleSubGroups.size(), - group.subGroups.values().stream() - .filter(subGroup -> subGroup.getRunningQueries() + subGroup.getQueuedQueries() > 0) - .filter(subGroup -> !includeStaticSubgroupsOnly || subGroup.isStaticResourceGroup()) - .map(subGroup -> summarizeSubgroups ? subGroup.getSummaryInfo() : subGroup.getResourceGroupInfo(includeQueryInfo, false, includeStaticSubgroupsOnly)) - .collect(toImmutableList()), - includeQueryInfo ? group.getAggregatedRunningQueriesInfo() : null, - group.version); - - allVersionResourceGroupinfo.add(resourceGroupInfo); - if (group.getPrev().isPresent()) { - group = group.getPrev().get(); - } - else { - group = null; - } - } - return allVersionResourceGroupinfo; + includeQueryInfo ? getAggregatedRunningQueriesInfo() : null); } } @@ -286,8 +208,7 @@ public ResourceGroupInfo getInfo() .filter(group -> group.getRunningQueries() + group.getQueuedQueries() > 0) .map(InternalResourceGroup::getSummaryInfo) .collect(toImmutableList()), - null, - version); + null); } } @@ -304,12 +225,11 @@ private ResourceGroupInfo getSummaryInfo() hardConcurrencyLimit, maxQueuedQueries, DataSize.succinctBytes(cachedMemoryUsageBytes), - getAllQueuedQueriesCount(), - getAllRunningQueriesCount(), + getQueuedQueries(), + getRunningQueries(), eligibleSubGroups.size(), null, - null, - version); + null); } } @@ -350,21 +270,6 @@ private List getAggregatedRunningQueriesInfo() } } - private List getAllAggregatedRunningQueriesInfo() - { - synchronized (root) { - InternalResourceGroup resourceGroup = this; - ImmutableList.Builder result = ImmutableList.builder(); - result.addAll(resourceGroup.getAggregatedRunningQueriesInfo()); - while (resourceGroup.getPrev().isPresent()) { - resourceGroup = resourceGroup.getPrev().get(); - result.addAll(resourceGroup.getAggregatedRunningQueriesInfo()); - } - - return result.build(); - } - } - public List getPathToRoot() { synchronized (root) { @@ -393,31 +298,6 @@ public int getRunningQueries() } } - private int getAllRunningQueriesCount() - { - synchronized (root) { - int allRunningQueries = getRunningQueries(); - if (getPrev().isPresent()) { - allRunningQueries += getPrev().get().getRunningQueries(); - } - return allRunningQueries; - } - } - - public int getEligibleSubGroupsCount() - { - synchronized (root) { - return eligibleSubGroups.size(); - } - } - - public int getDirtySubGroupsCount() - { - synchronized (root) { - return dirtySubGroups.size(); - } - } - @Managed public int getQueuedQueries() { @@ -426,34 +306,6 @@ public int getQueuedQueries() } } - public int getAllQueuedQueriesCount() - { - synchronized (root) { - int allQueuedQueries = getQueuedQueries(); - if (getPrev().isPresent()) { - allQueuedQueries += getPrev().get().getQueuedQueries(); - } - return allQueuedQueries; - } - } - - public void requeueToNext() - { - synchronized (root) { - if (getQueuedQueries() == 0 || !next.isPresent() || !next.get().isLeafResourceGroup() || next.get().getHardConcurrencyLimit() == 0) { - return; - } - - while (!queuedQueries.isEmpty()) { - next.get().enqueueQuery(queuedQueries.poll()); - if (parent.isPresent()) { - parent.get().descendantQueuedQueries--; - } - } - updateEligibility(); - } - } - @Managed public int getWaitingQueuedQueries() { @@ -726,104 +578,28 @@ public void setJmxExport(boolean export) jmxExportListener.accept(this, export); } - @Override - public int getVersion() - { - return version; - } - - public void setNext(InternalResourceGroup nextNeighbour) - { - synchronized (root) { - next = Optional.of(nextNeighbour); - } - } - - public Optional getNext() - { - synchronized (root) { - return next; - } - } - - public void setPrev(InternalResourceGroup prevNeighbour) - { - synchronized (root) { - if (prevNeighbour == null) { - prev = Optional.empty(); - } - else { - prev = Optional.of(prevNeighbour); - } - } - } - - public Optional getPrev() - { - synchronized (root) { - return prev; - } - } - - //Get the latest version of the resource group - public InternalResourceGroup getLatest() - { - synchronized (root) { - InternalResourceGroup latest = this; - while (latest.next.isPresent()) { - latest = latest.next.get(); - } - return latest; - } - } - - private boolean isLeafResourceGroup() - { - synchronized (root) { - for (InternalResourceGroup group : subGroups.values()) { - if (group.getMaxQueuedQueries() != 0 || group.getHardConcurrencyLimit() != 0) { - return false; - } - } - return true; - } - } - - public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegment, int version) + public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegment) { requireNonNull(name, "name is null"); synchronized (root) { - verify(getLatest().runningQueries.isEmpty() && getLatest().queuedQueries.isEmpty(), "Cannot add sub group to %s while queries are running", id); + checkArgument(runningQueries.isEmpty() && queuedQueries.isEmpty(), "Cannot add sub group to %s while queries are running", id); if (subGroups.containsKey(name)) { - InternalResourceGroup subGroup = subGroups.get(name).getLatest(); - if (subGroup.getVersion() == version) { - return subGroup; - } + return subGroups.get(name); } // parent segments size equals to subgroup segment index int subGroupSegmentIndex = id.getSegments().size(); - InternalResourceGroup newSubGroup = new InternalResourceGroup( + InternalResourceGroup subGroup = new InternalResourceGroup( Optional.of(this), name, jmxExportListener, executor, - staticResourceGroup && staticSegment, - version); - + staticResourceGroup && staticSegment); // Sub group must use query priority to ensure ordering if (schedulingPolicy == QUERY_PRIORITY) { - newSubGroup.setSchedulingPolicy(QUERY_PRIORITY); + subGroup.setSchedulingPolicy(QUERY_PRIORITY); } - if (subGroups.containsKey(name)) { - InternalResourceGroup subGroup = subGroups.get(name).getLatest(); - subGroup.setNext(newSubGroup); - newSubGroup.setPrev(subGroup); - } - else { - subGroups.put(name, newSubGroup); - } - - return newSubGroup; + subGroups.put(name, subGroup); + return subGroup; } } @@ -847,10 +623,9 @@ public int getRunningTaskCount() public void run(ManagedQueryExecution query) { synchronized (root) { - if (!isLeafResourceGroup()) { + if (!subGroups.isEmpty()) { throw new PrestoException(INVALID_RESOURCE_GROUP, format("Cannot add queries to %s. It is not a leaf group.", id)); } - // Check all ancestors for capacity InternalResourceGroup group = this; boolean canQueue = true; @@ -910,11 +685,6 @@ private void updateEligibility() parent.get().eligibleSubGroups.remove(this); lastStartMillis = 0; } - Optional next = getNext(); - while (next.isPresent()) { - next.get().updateEligibility(); - next = next.get().getNext(); - } parent.get().updateEligibility(); } } @@ -1021,7 +791,6 @@ protected boolean internalStartNext() if (!canRunMore()) { return false; } - ManagedQueryExecution query = queuedQueries.poll(); if (query != null) { startInBackground(query); @@ -1100,7 +869,7 @@ private boolean isEligibleToStartNext() if (!canRunMore()) { return false; } - return !queuedQueries.isEmpty() || getEligibleSubGroupsCount() > 0; + return !queuedQueries.isEmpty() || !eligibleSubGroups.isEmpty(); } } @@ -1120,27 +889,7 @@ private boolean canQueueMore() { checkState(Thread.holdsLock(root), "Must hold lock"); synchronized (root) { - return getQueuedQueries() < maxQueuedQueries; - } - } - - private long getCpuUsageMillis() - { - synchronized (root) { - if (getPrev().isPresent()) { - return cpuUsageMillis + getPrev().get().getCpuUsageMillis(); - } - return cpuUsageMillis; - } - } - - private long getCachedMemoryUsageBytes() - { - synchronized (root) { - if (getPrev().isPresent()) { - return cachedMemoryUsageBytes + getPrev().get().getCachedMemoryUsageBytes(); - } - return cachedMemoryUsageBytes; + return descendantQueuedQueries + queuedQueries.size() < maxQueuedQueries; } } @@ -1148,7 +897,7 @@ private boolean canRunMore() { checkState(Thread.holdsLock(root), "Must hold lock"); synchronized (root) { - if (getCpuUsageMillis() >= hardCpuLimitMillis) { + if (cpuUsageMillis >= hardCpuLimitMillis) { return false; } @@ -1157,18 +906,18 @@ private boolean canRunMore() } int hardConcurrencyLimit = this.hardConcurrencyLimit; - if (getCpuUsageMillis() >= softCpuLimitMillis) { + if (cpuUsageMillis >= softCpuLimitMillis) { // TODO: Consider whether cpu limit math should be performed on softConcurrency or hardConcurrency // Linear penalty between soft and hard limit - double penalty = (getCpuUsageMillis() - softCpuLimitMillis) / (double) (hardCpuLimitMillis - softCpuLimitMillis); + double penalty = (cpuUsageMillis - softCpuLimitMillis) / (double) (hardCpuLimitMillis - softCpuLimitMillis); hardConcurrencyLimit = (int) Math.floor(hardConcurrencyLimit * (1 - penalty)); // Always penalize by at least one hardConcurrencyLimit = min(this.hardConcurrencyLimit - 1, hardConcurrencyLimit); // Always allow at least one running query hardConcurrencyLimit = Math.max(1, hardConcurrencyLimit); } - return getAllRunningQueriesCount() < hardConcurrencyLimit && - getCachedMemoryUsageBytes() <= softMemoryLimitBytes; + return runningQueries.size() + descendantRunningQueries < hardConcurrencyLimit && + cachedMemoryUsageBytes <= softMemoryLimitBytes; } } @@ -1206,36 +955,6 @@ public int hashCode() return Objects.hash(id); } - public InternalResourceGroup getRoot() - { - return root; - } - - public void purgeExpiredResourceGroups() - { - synchronized (root) { - if (getRunningQueries() > 0 || getQueuedQueries() > 0) { - return; - } - for (InternalResourceGroup group : subGroups.values()) { - group.purgeExpiredResourceGroups(); - } - if (!getPrev().isPresent()) { - if (getRunningQueries() == 0 && getQueuedQueries() == 0) { - if (next.isPresent() && parent.isPresent()) { - next.get().setPrev(null); - next = Optional.empty(); - } - subGroups.clear(); - while (!eligibleSubGroups.isEmpty()) { - eligibleSubGroups.poll(); - } - dirtySubGroups.clear(); - } - } - } - } - @ThreadSafe public static final class RootInternalResourceGroup extends InternalResourceGroup @@ -1245,33 +964,23 @@ public static final class RootInternalResourceGroup public RootInternalResourceGroup( String name, BiConsumer jmxExportListener, - Executor executor, - Integer version) + Executor executor) { - super(Optional.empty(), name, jmxExportListener, executor, true, version); + super(Optional.empty(), name, jmxExportListener, executor, true); } - public RootInternalResourceGroup(String name, BiConsumer jmxExportListener, Executor executor, InternalResourceGroup root, Integer version) + public synchronized void processQueuedQueries() { - super(Optional.empty(), name, jmxExportListener, executor, root, true, version); - } - - public void processQueuedQueries() - { - synchronized (super.root) { - internalRefreshStats(); - while (internalStartNext()) { - // start all the queries we can - } + internalRefreshStats(); + while (internalStartNext()) { + // start all the queries we can } } - public void generateCpuQuota(long elapsedSeconds) + public synchronized void generateCpuQuota(long elapsedSeconds) { - synchronized (super.root) { - if (elapsedSeconds > 0) { - internalGenerateCpuQuota(elapsedSeconds); - } + if (elapsedSeconds > 0) { + internalGenerateCpuQuota(elapsedSeconds); } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java index 29293e74527c6..3ffbbcfc93c82 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java @@ -15,7 +15,6 @@ import com.facebook.airlift.log.Logger; import com.facebook.airlift.node.NodeInfo; -import com.facebook.presto.execution.ForQueryScheduling; import com.facebook.presto.execution.ManagedQueryExecution; import com.facebook.presto.execution.QueryManagerConfig; import com.facebook.presto.execution.resourceGroups.InternalResourceGroup.RootInternalResourceGroup; @@ -30,7 +29,6 @@ import com.facebook.presto.spi.resourceGroups.SelectionCriteria; import com.facebook.presto.sql.tree.Statement; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.weakref.jmx.JmxException; import org.weakref.jmx.MBeanExporter; @@ -51,7 +49,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -69,7 +66,6 @@ import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; @ThreadSafe public final class InternalResourceGroupManager @@ -79,9 +75,7 @@ public final class InternalResourceGroupManager private static final File RESOURCE_GROUPS_CONFIGURATION = new File("etc/resource-groups.properties"); private static final String CONFIGURATION_MANAGER_PROPERTY_NAME = "resource-groups.configuration-manager"; - private final ExecutorService refreshResourceGroupExecutor; private final ScheduledExecutorService refreshExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("ResourceGroupManager")); - private final ScheduledExecutorService resourceGroupRefreshExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("ResourceGroupRefreshManager")); private final List rootGroups = new CopyOnWriteArrayList<>(); private final ConcurrentMap groups = new ConcurrentHashMap<>(); private final AtomicReference> configurationManager; @@ -101,8 +95,7 @@ public InternalResourceGroupManager( ClusterMemoryPoolManager memoryPoolManager, QueryManagerConfig queryManagerConfig, NodeInfo nodeInfo, - MBeanExporter exporter, - @ForQueryScheduling ExecutorService executorService) + MBeanExporter exporter) { requireNonNull(queryManagerConfig, "queryManagerConfig is null"); this.exporter = requireNonNull(exporter, "exporter is null"); @@ -110,7 +103,6 @@ public InternalResourceGroupManager( this.legacyManager = requireNonNull(legacyManager, "legacyManager is null"); this.configurationManager = new AtomicReference<>(cast(legacyManager)); this.maxTotalRunningTaskCountToNotExecuteNewQuery = queryManagerConfig.getMaxTotalRunningTaskCountToNotExecuteNewQuery(); - this.refreshResourceGroupExecutor = executorService; } @Override @@ -132,7 +124,7 @@ public void submit(Statement statement, ManagedQueryExecution queryExecution, Se { checkState(configurationManager.get() != null, "configurationManager not set"); createGroupIfNecessary(selectionContext, executor); - groups.get(selectionContext.getResourceGroupId()).getLatest().run(queryExecution); + groups.get(selectionContext.getResourceGroupId()).run(queryExecution); } @Override @@ -162,9 +154,6 @@ public void loadConfigurationManager() "Resource groups configuration %s does not contain %s", RESOURCE_GROUPS_CONFIGURATION.getAbsoluteFile(), CONFIGURATION_MANAGER_PROPERTY_NAME); setConfigurationManager(configurationManagerName, properties); - if (configurationManager.get().dynamicReloadSupported()) { - resourceGroupRefreshExecutor.scheduleWithFixedDelay(this::refreshInternalResourceGroups, 1, 1, SECONDS); - } } } @@ -198,7 +187,6 @@ public ResourceGroupConfigurationManager getConfigurationManager() public void destroy() { refreshExecutor.shutdownNow(); - resourceGroupRefreshExecutor.shutdownNow(); } @PostConstruct @@ -236,149 +224,44 @@ else if (elapsedSeconds < 0) { } for (RootInternalResourceGroup group : rootGroups) { - while (group != null) { - try { - if (elapsedSeconds > 0) { - group.generateCpuQuota(elapsedSeconds); - } - } - catch (RuntimeException e) { - log.error(e, "Exception while generating cpu quota for %s", group); - } - try { - group.setTaskLimitExceeded(taskLimitExceeded.get()); - group.processQueuedQueries(); - } - catch (RuntimeException e) { - log.error(e, "Exception while processing queued queries for %s version %s", group, group.getVersion()); - } - if (group.getNext().isPresent()) { - group = (RootInternalResourceGroup) group.getNext().get(); - } - else { - group = null; - } - } - } - } - - @VisibleForTesting - public synchronized void refreshInternalResourceGroups() - { - boolean requeue = false; - boolean versionChanged = false; - int specVersion = configurationManager.get().getSpecVersion(); - for (InternalResourceGroup rootGroup : rootGroups) { - if (rootGroup.getLatest().getVersion() < specVersion) { - versionChanged = true; - break; - } - } - - if (!versionChanged) { - return; - } - - for (ResourceGroupId id : groups.keySet()) { - InternalResourceGroup group = groups.get(id).getLatest(); - if (group.getVersion() != specVersion) { - requeue = true; - } - createGroupIfNecessary(group, refreshResourceGroupExecutor, specVersion); - } - - if (requeue) { - for (ResourceGroupId id : groups.keySet()) { - InternalResourceGroup group = groups.get(id).getLatest(); - if (group.getPrev().isPresent()) { - group.getPrev().get().requeueToNext(); + try { + if (elapsedSeconds > 0) { + group.generateCpuQuota(elapsedSeconds); } } - } - //Clean up process to remove old versions if they no longer have running/queued queries - for (InternalResourceGroup group : rootGroups) { - while (group.getNext().isPresent()) { - group.purgeExpiredResourceGroups(); - group = group.getNext().get(); + catch (RuntimeException e) { + log.error(e, "Exception while generation cpu quota for %s", group); } - } - } - - @VisibleForTesting - public List getRootGroups() - { - return ImmutableList.copyOf(rootGroups); - } - - private void createGroupIfNecessary(InternalResourceGroup resourceGroup, Executor executor, int version) - { - synchronized (this) { - if (resourceGroup.getVersion() == version) { - return; + try { + group.setTaskLimitExceeded(taskLimitExceeded.get()); + group.processQueuedQueries(); } - - InternalResourceGroup newGroup; - ResourceGroupId resourceGroupId = resourceGroup.getId(); - if (resourceGroupId.getParent().isPresent()) { - createGroupIfNecessary(groups.get(resourceGroupId.getParent().get()).getLatest(), executor, version); - InternalResourceGroup parent = groups.get(resourceGroupId.getParent().get()).getLatest(); - newGroup = parent.getOrCreateSubGroup(resourceGroupId.getLastSegment(), resourceGroup.isStaticResourceGroup(), version); + catch (RuntimeException e) { + log.error(e, "Exception while processing queued queries for %s", group); } - else { - newGroup = new RootInternalResourceGroup(resourceGroup.getId().getSegments().get(0), this::exportGroup, executor, resourceGroup.getRoot(), version); - } - configurationManager.get().configure(newGroup); - resourceGroup.setNext(newGroup); - newGroup.setPrev(resourceGroup); - groups.put(resourceGroupId, newGroup); } } - private void createGroupIfNecessary(SelectionContext context, Executor executor) + private synchronized void createGroupIfNecessary(SelectionContext context, Executor executor) { - synchronized (this) { - ResourceGroupId id = context.getResourceGroupId(); - int specVersion = configurationManager.get().getSpecVersion(); - if (!groups.containsKey(id)) { - InternalResourceGroup group; - if (id.getParent().isPresent()) { - createGroupIfNecessary(new SelectionContext<>(id.getParent().get(), context.getContext()), executor); - InternalResourceGroup parent = groups.get(id.getParent().get()).getLatest(); - requireNonNull(parent, "parent is null"); - // parent segments size equals to subgroup segment index - int subGroupSegmentIndex = parent.getId().getSegments().size(); - group = parent.getOrCreateSubGroup(id.getLastSegment(), !context.getFirstDynamicSegmentPosition().equals(OptionalInt.of(subGroupSegmentIndex)), specVersion); - } - else { - RootInternalResourceGroup root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, specVersion); - group = root; - rootGroups.add(root); - } - configurationManager.get().configure(group, context); - checkState(groups.put(id, group) == null, "Unexpected existing resource group"); + ResourceGroupId id = context.getResourceGroupId(); + if (!groups.containsKey(id)) { + InternalResourceGroup group; + if (id.getParent().isPresent()) { + createGroupIfNecessary(new SelectionContext<>(id.getParent().get(), context.getContext()), executor); + InternalResourceGroup parent = groups.get(id.getParent().get()); + requireNonNull(parent, "parent is null"); + // parent segments size equals to subgroup segment index + int subGroupSegmentIndex = parent.getId().getSegments().size(); + group = parent.getOrCreateSubGroup(id.getLastSegment(), !context.getFirstDynamicSegmentPosition().equals(OptionalInt.of(subGroupSegmentIndex))); } else { - InternalResourceGroup group = groups.get(id).getLatest(); - - if (specVersion != group.getVersion()) { - InternalResourceGroup newGroup; - if (id.getParent().isPresent()) { - createGroupIfNecessary(new SelectionContext<>(id.getParent().get(), context.getContext()), executor); - InternalResourceGroup parent = groups.get(id.getParent().get()).getLatest(); - requireNonNull(parent, "parent is null"); - // parent segments size equals to subgroup segment index - int subGroupSegmentIndex = parent.getId().getSegments().size(); - newGroup = parent.getOrCreateSubGroup(id.getLastSegment(), !context.getFirstDynamicSegmentPosition().equals(OptionalInt.of(subGroupSegmentIndex)), specVersion); - } - else { - newGroup = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, group.getRoot(), specVersion); - } - configurationManager.get().configure(newGroup, context); - group.setNext(newGroup); - newGroup.setPrev(group); - groups.put(id, newGroup); - } + RootInternalResourceGroup root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor); + group = root; + rootGroups.add(root); } + configurationManager.get().configure(group, context); + checkState(groups.put(id, group) == null, "Unexpected existing resource group"); } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/LegacyResourceGroupConfigurationManager.java b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/LegacyResourceGroupConfigurationManager.java index 5849e73c2d1d6..5e0fb319660a5 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/LegacyResourceGroupConfigurationManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/LegacyResourceGroupConfigurationManager.java @@ -55,26 +55,9 @@ public void configure(ResourceGroup group, SelectionContext criteri group.setHardConcurrencyLimit(hardConcurrencyLimit); } - @Override - public void configure(ResourceGroup group) - { - } - - @Override - public boolean dynamicReloadSupported() - { - return false; - } - @Override public Optional> match(SelectionCriteria criteria) { return Optional.of(new SelectionContext<>(GLOBAL, VoidContext.NONE)); } - - @Override - public int getSpecVersion() - { - return 0; - } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java index 8cf0c148f8e1c..1abf3537acffa 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/CoordinatorModule.java @@ -52,7 +52,6 @@ import com.facebook.presto.execution.DropViewTask; import com.facebook.presto.execution.ExplainAnalyzeContext; import com.facebook.presto.execution.ForQueryExecution; -import com.facebook.presto.execution.ForQueryScheduling; import com.facebook.presto.execution.GrantRolesTask; import com.facebook.presto.execution.GrantTask; import com.facebook.presto.execution.PrepareTask; @@ -295,8 +294,6 @@ protected void setup(Binder binder) binder.bind(ScheduledExecutorService.class).annotatedWith(ForScheduler.class) .toInstance(newSingleThreadScheduledExecutor(threadsNamed("stage-scheduler"))); - binder.bind(ExecutorService.class).annotatedWith(ForQueryScheduling.class) - .toInstance(newCachedThreadPool(threadsNamed("query-scheduler-%s"))); // query execution binder.bind(ExecutorService.class).annotatedWith(ForQueryExecution.class) diff --git a/presto-main/src/main/java/com/facebook/presto/server/ResourceGroupInfo.java b/presto-main/src/main/java/com/facebook/presto/server/ResourceGroupInfo.java index a5a8cc500797c..07065285b7fbc 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ResourceGroupInfo.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ResourceGroupInfo.java @@ -52,8 +52,6 @@ public class ResourceGroupInfo private final List subGroups; private final List runningQueries; - private final int version; - @JsonCreator public ResourceGroupInfo( @JsonProperty("id") ResourceGroupId id, @@ -69,8 +67,7 @@ public ResourceGroupInfo( @JsonProperty("numRunningQueries") int numRunningQueries, @JsonProperty("numEligibleSubGroups") int numEligibleSubGroups, @JsonProperty("subGroups") List subGroups, - @JsonProperty("runningQueries") List runningQueries, - @JsonProperty("version") int version) + @JsonProperty("runningQueries") List runningQueries) { this.id = requireNonNull(id, "id is null"); this.state = requireNonNull(state, "state is null"); @@ -92,8 +89,6 @@ public ResourceGroupInfo( this.runningQueries = runningQueries; this.subGroups = subGroups; - - this.version = version; } @JsonProperty @@ -195,10 +190,4 @@ public List getSubGroups() { return subGroups; } - - @JsonProperty - public int getVersion() - { - return version; - } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java index f288ee067e7d3..c614a7d0a83e7 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java @@ -71,13 +71,13 @@ public static class BenchmarkData @Setup public void setup() { - root = new RootInternalResourceGroup("root", (group, export) -> {}, executor, 1); + root = new RootInternalResourceGroup("root", (group, export) -> {}, executor); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(queries); root.setHardConcurrencyLimit(queries); InternalResourceGroup group = root; for (int i = 0; i < children; i++) { - group = root.getOrCreateSubGroup(String.valueOf(i), true, 1); + group = root.getOrCreateSubGroup(String.valueOf(i), true); group.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group.setMaxQueuedQueries(queries); group.setHardConcurrencyLimit(queries); diff --git a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java index fdd862e0ba2df..52de2d47d7703 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java @@ -62,7 +62,7 @@ public class TestResourceGroups @Test(timeOut = 10_000) public void testQueueFull() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(1); root.setHardConcurrencyLimit(1); @@ -81,19 +81,19 @@ public void testQueueFull() @Test(timeOut = 10_000) public void testFairEligibility() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(1); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(4); group1.setHardConcurrencyLimit(1); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(4); group2.setHardConcurrencyLimit(1); - InternalResourceGroup group3 = root.getOrCreateSubGroup("3", true, 0); + InternalResourceGroup group3 = root.getOrCreateSubGroup("3", true); group3.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group3.setMaxQueuedQueries(4); group3.setHardConcurrencyLimit(1); @@ -136,15 +136,15 @@ public void testFairEligibility() @Test public void testSetSchedulingPolicy() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(1); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(4); group1.setHardConcurrencyLimit(2); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(4); group2.setHardConcurrencyLimit(2); @@ -162,31 +162,31 @@ public void testSetSchedulingPolicy() assertEquals(query2a.getState(), QUEUED); assertEquals(root.getInfo().getNumEligibleSubGroups(), 2); - assertEquals(root.getOrCreateSubGroup("1", true, 0).getQueuedQueries(), 2); - assertEquals(root.getOrCreateSubGroup("2", true, 0).getQueuedQueries(), 1); + assertEquals(root.getOrCreateSubGroup("1", true).getQueuedQueries(), 2); + assertEquals(root.getOrCreateSubGroup("2", true).getQueuedQueries(), 1); assertEquals(root.getSchedulingPolicy(), FAIR); root.setSchedulingPolicy(QUERY_PRIORITY); assertEquals(root.getInfo().getNumEligibleSubGroups(), 2); - assertEquals(root.getOrCreateSubGroup("1", true, 0).getQueuedQueries(), 2); - assertEquals(root.getOrCreateSubGroup("2", true, 0).getQueuedQueries(), 1); + assertEquals(root.getOrCreateSubGroup("1", true).getQueuedQueries(), 2); + assertEquals(root.getOrCreateSubGroup("2", true).getQueuedQueries(), 1); assertEquals(root.getSchedulingPolicy(), QUERY_PRIORITY); - assertEquals(root.getOrCreateSubGroup("1", true, 0).getSchedulingPolicy(), QUERY_PRIORITY); - assertEquals(root.getOrCreateSubGroup("2", true, 0).getSchedulingPolicy(), QUERY_PRIORITY); + assertEquals(root.getOrCreateSubGroup("1", true).getSchedulingPolicy(), QUERY_PRIORITY); + assertEquals(root.getOrCreateSubGroup("2", true).getSchedulingPolicy(), QUERY_PRIORITY); } @Test(timeOut = 10_000) public void testFairQueuing() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(1); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(4); group1.setHardConcurrencyLimit(2); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(4); group2.setHardConcurrencyLimit(2); @@ -220,7 +220,7 @@ public void testFairQueuing() @Test(timeOut = 10_000) public void testMemoryLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, BYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(3); @@ -245,11 +245,11 @@ public void testMemoryLimit() @Test public void testSubgroupMemoryLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(10, BYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(3); - InternalResourceGroup subgroup = root.getOrCreateSubGroup("subgroup", true, 0); + InternalResourceGroup subgroup = root.getOrCreateSubGroup("subgroup", true); subgroup.setSoftMemoryLimit(new DataSize(1, BYTE)); subgroup.setMaxQueuedQueries(4); subgroup.setHardConcurrencyLimit(3); @@ -275,7 +275,7 @@ public void testSubgroupMemoryLimit() @Test(timeOut = 10_000) public void testSoftCpuLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, BYTE)); root.setSoftCpuLimit(new Duration(1, SECONDS)); root.setHardCpuLimit(new Duration(2, SECONDS)); @@ -309,7 +309,7 @@ public void testSoftCpuLimit() @Test(timeOut = 10_000) public void testHardCpuLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, BYTE)); root.setHardCpuLimit(new Duration(1, SECONDS)); root.setCpuQuotaGenerationMillisPerSecond(2000); @@ -334,17 +334,17 @@ public void testHardCpuLimit() @Test(timeOut = 10_000) public void testPriorityScheduling() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(100); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(QUERY_PRIORITY); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(100); group1.setHardConcurrencyLimit(1); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(100); group2.setHardConcurrencyLimit(1); @@ -384,18 +384,18 @@ public void testPriorityScheduling() @Test(timeOut = 10_000) public void testWeightedScheduling() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(WEIGHTED); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(2); group1.setHardConcurrencyLimit(2); group1.setSoftConcurrencyLimit(2); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(2); group2.setHardConcurrencyLimit(2); @@ -433,21 +433,21 @@ public void testWeightedScheduling() @Test(timeOut = 10_000) public void testWeightedFairScheduling() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(50); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(WEIGHTED_FAIR); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(50); group1.setHardConcurrencyLimit(2); group1.setSoftConcurrencyLimit(2); group1.setSchedulingWeight(1); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(50); group2.setHardConcurrencyLimit(2); @@ -476,28 +476,28 @@ public void testWeightedFairScheduling() @Test(timeOut = 10_000) public void testWeightedFairSchedulingEqualWeights() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(50); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(WEIGHTED_FAIR); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(50); group1.setHardConcurrencyLimit(2); group1.setSoftConcurrencyLimit(2); group1.setSchedulingWeight(1); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(50); group2.setHardConcurrencyLimit(2); group2.setSoftConcurrencyLimit(2); group2.setSchedulingWeight(1); - InternalResourceGroup group3 = root.getOrCreateSubGroup("3", true, 0); + InternalResourceGroup group3 = root.getOrCreateSubGroup("3", true); group3.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group3.setMaxQueuedQueries(50); group3.setHardConcurrencyLimit(2); @@ -535,21 +535,21 @@ public void testWeightedFairSchedulingEqualWeights() @Test(timeOut = 10_000) public void testWeightedFairSchedulingNoStarvation() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(50); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(WEIGHTED_FAIR); - InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true, 0); + InternalResourceGroup group1 = root.getOrCreateSubGroup("1", true); group1.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group1.setMaxQueuedQueries(50); group1.setHardConcurrencyLimit(2); group1.setSoftConcurrencyLimit(2); group1.setSchedulingWeight(1); - InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true, 0); + InternalResourceGroup group2 = root.getOrCreateSubGroup("2", true); group2.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); group2.setMaxQueuedQueries(50); group2.setHardConcurrencyLimit(2); @@ -576,41 +576,41 @@ public void testWeightedFairSchedulingNoStarvation() @Test public void testGetInfo() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(40); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(WEIGHTED); - InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true, 0); + InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true); rootA.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootA.setMaxQueuedQueries(20); rootA.setHardConcurrencyLimit(2); - InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true, 0); + InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true); rootB.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootB.setMaxQueuedQueries(20); rootB.setHardConcurrencyLimit(2); rootB.setSchedulingWeight(2); rootB.setSchedulingPolicy(QUERY_PRIORITY); - InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true, 0); + InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true); rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAX.setMaxQueuedQueries(10); rootAX.setHardConcurrencyLimit(10); - InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true, 0); + InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true); rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAY.setMaxQueuedQueries(10); rootAY.setHardConcurrencyLimit(10); - InternalResourceGroup rootBX = rootB.getOrCreateSubGroup("x", true, 0); + InternalResourceGroup rootBX = rootB.getOrCreateSubGroup("x", true); rootBX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootBX.setMaxQueuedQueries(10); rootBX.setHardConcurrencyLimit(10); - InternalResourceGroup rootBY = rootB.getOrCreateSubGroup("y", true, 0); + InternalResourceGroup rootBY = rootB.getOrCreateSubGroup("y", true); rootBY.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootBY.setMaxQueuedQueries(10); rootBY.setHardConcurrencyLimit(10); @@ -666,30 +666,30 @@ public void testGetInfo() @Test public void testGetResourceGroupStateInfo() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, GIGABYTE)); root.setMaxQueuedQueries(40); root.setHardConcurrencyLimit(10); root.setSchedulingPolicy(WEIGHTED); - InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true, 0); + InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true); rootA.setSoftMemoryLimit(new DataSize(10, MEGABYTE)); rootA.setMaxQueuedQueries(20); rootA.setHardConcurrencyLimit(0); - InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true, 0); + InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true); rootB.setSoftMemoryLimit(new DataSize(5, MEGABYTE)); rootB.setMaxQueuedQueries(20); rootB.setHardConcurrencyLimit(1); rootB.setSchedulingWeight(2); rootB.setSchedulingPolicy(QUERY_PRIORITY); - InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true, 0); + InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true); rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAX.setMaxQueuedQueries(10); rootAX.setHardConcurrencyLimit(10); - InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true, 0); + InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true); rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAY.setMaxQueuedQueries(10); rootAY.setHardConcurrencyLimit(10); @@ -734,18 +734,18 @@ public void testGetResourceGroupStateInfo() @Test public void testGetStaticResourceGroupInfo() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 0); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, GIGABYTE)); root.setMaxQueuedQueries(100); root.setHardConcurrencyLimit(10); root.setSchedulingPolicy(WEIGHTED); - InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true, 0); + InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true); rootA.setSoftMemoryLimit(new DataSize(10, MEGABYTE)); rootA.setMaxQueuedQueries(100); rootA.setHardConcurrencyLimit(0); - InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true, 0); + InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true); rootB.setSoftMemoryLimit(new DataSize(5, MEGABYTE)); rootB.setMaxQueuedQueries(100); rootB.setHardConcurrencyLimit(1); @@ -753,25 +753,25 @@ public void testGetStaticResourceGroupInfo() rootB.setSchedulingPolicy(QUERY_PRIORITY); // x is a dynamic resource group - InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", false, 0); + InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", false); rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAX.setMaxQueuedQueries(10); rootAX.setHardConcurrencyLimit(10); - InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true, 0); + InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true); rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAY.setMaxQueuedQueries(10); rootAY.setHardConcurrencyLimit(10); for (int i = 0; i < 10; i++) { - InternalResourceGroup subGroup = rootAX.getOrCreateSubGroup("ax" + i, false, 0); + InternalResourceGroup subGroup = rootAX.getOrCreateSubGroup("ax" + i, false); subGroup.setSoftMemoryLimit(new DataSize(i, MEGABYTE)); subGroup.setMaxQueuedQueries(10); subGroup.setHardConcurrencyLimit(10); } for (int i = 0; i < 10; i++) { - fillGroupTo(rootAX.getOrCreateSubGroup("ax" + i, true, 0), ImmutableSet.of(), 1, false); + fillGroupTo(rootAX.getOrCreateSubGroup("ax" + i, true), ImmutableSet.of(), 1, false); } fillGroupTo(rootAY, ImmutableSet.of(), 5, false); fillGroupTo(rootB, ImmutableSet.of(), 10, true); @@ -811,38 +811,38 @@ private Optional getResourceGroupInfoForId(InternalResourceGr @Test public void testGetBlockedQueuedQueries() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(40); // Start with zero capacity, so that nothing starts running until we've added all the queries root.setHardConcurrencyLimit(0); - InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true, 0); + InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true); rootA.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootA.setMaxQueuedQueries(20); rootA.setHardConcurrencyLimit(8); - InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true, 0); + InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true); rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAX.setMaxQueuedQueries(10); rootAX.setHardConcurrencyLimit(8); - InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true, 0); + InternalResourceGroup rootAY = rootA.getOrCreateSubGroup("y", true); rootAY.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAY.setMaxQueuedQueries(10); rootAY.setHardConcurrencyLimit(5); - InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true, 0); + InternalResourceGroup rootB = root.getOrCreateSubGroup("b", true); rootB.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootB.setMaxQueuedQueries(20); rootB.setHardConcurrencyLimit(8); - InternalResourceGroup rootBX = rootB.getOrCreateSubGroup("x", true, 0); + InternalResourceGroup rootBX = rootB.getOrCreateSubGroup("x", true); rootBX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootBX.setMaxQueuedQueries(10); rootBX.setHardConcurrencyLimit(8); - InternalResourceGroup rootBY = rootB.getOrCreateSubGroup("y", true, 0); + InternalResourceGroup rootBY = rootB.getOrCreateSubGroup("y", true); rootBY.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootBY.setMaxQueuedQueries(10); rootBY.setHardConcurrencyLimit(5); diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java b/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java index 2bd4f3db242a8..18a8ae9d23121 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java @@ -45,18 +45,18 @@ public class TestQueryStateInfo @Test public void testQueryStateInfo() { - InternalResourceGroup.RootInternalResourceGroup root = new InternalResourceGroup.RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), 1); + InternalResourceGroup.RootInternalResourceGroup root = new InternalResourceGroup.RootInternalResourceGroup("root", (group, export) -> {}, directExecutor()); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(40); root.setHardConcurrencyLimit(0); root.setSchedulingPolicy(WEIGHTED); - InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true, 0); + InternalResourceGroup rootA = root.getOrCreateSubGroup("a", true); rootA.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootA.setMaxQueuedQueries(20); rootA.setHardConcurrencyLimit(0); - InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true, 0); + InternalResourceGroup rootAX = rootA.getOrCreateSubGroup("x", true); rootAX.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); rootAX.setMaxQueuedQueries(10); rootAX.setHardConcurrencyLimit(0); diff --git a/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/FileResourceGroupConfigurationManager.java b/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/FileResourceGroupConfigurationManager.java index b00396a0df6e2..2ab98f70f900f 100644 --- a/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/FileResourceGroupConfigurationManager.java +++ b/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/FileResourceGroupConfigurationManager.java @@ -108,17 +108,6 @@ public void configure(ResourceGroup group, SelectionContext context configureGroup(group, entry.getValue()); } - @Override - public void configure(ResourceGroup group) - { - } - - @Override - public boolean dynamicReloadSupported() - { - return false; - } - @Override public Optional> match(SelectionCriteria criteria) { @@ -129,12 +118,6 @@ public Optional> match(SelectionCriteria criteria) .findFirst(); } - @Override - public int getSpecVersion() - { - return 0; - } - @VisibleForTesting public List getSelectors() { diff --git a/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/db/DbResourceGroupConfigurationManager.java b/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/db/DbResourceGroupConfigurationManager.java index 6ff4c27f5653a..9a3c3af4e3a0a 100644 --- a/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/db/DbResourceGroupConfigurationManager.java +++ b/presto-resource-group-managers/src/main/java/com/facebook/presto/resourceGroups/db/DbResourceGroupConfigurationManager.java @@ -54,7 +54,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -78,7 +77,6 @@ public class DbResourceGroupConfigurationManager @GuardedBy("this") private Map resourceGroupSpecs = new HashMap<>(); private final ConcurrentMap> configuredGroups = new ConcurrentHashMap<>(); - private final ConcurrentMap configuredGroupsTemplate = new ConcurrentHashMap<>(); private final AtomicReference> rootGroups = new AtomicReference<>(ImmutableList.of()); private final AtomicReference> selectors = new AtomicReference<>(); private final AtomicReference> cpuQuotaPeriod = new AtomicReference<>(Optional.empty()); @@ -88,7 +86,6 @@ public class DbResourceGroupConfigurationManager private final String environment; private final Duration maxRefreshInterval; private final boolean exactMatchSelectorEnabled; - private AtomicInteger version; private final CounterStat refreshFailures = new CounterStat(); @@ -109,7 +106,6 @@ public DbResourceGroupConfigurationManager(ClusterMemoryPoolManager memoryPoolMa if (exactMatchSelectorEnabled) { this.dao.createExactMatchSelectorsTable(); } - version = new AtomicInteger(0); load(); } @@ -152,29 +148,12 @@ public void configure(ResourceGroup group, SelectionContext criteri if (groups.putIfAbsent(group.getId(), group) == null) { // If a new spec replaces the spec returned from getMatchingSpec the group will be reconfigured on the next run of load(). configuredGroups.computeIfAbsent(entry.getKey(), v -> new LinkedList<>()).add(group.getId()); - configuredGroupsTemplate.put(group.getId(), entry.getKey()); } synchronized (getRootGroup(group.getId())) { configureGroup(group, entry.getValue()); } } - @Override - public void configure(ResourceGroup group) - { - ResourceGroupIdTemplate resourceGroupIdTemplate = configuredGroupsTemplate.get(group.getId()); - ResourceGroupSpec resourceGroupSpec = resourceGroupSpecs.get(resourceGroupIdTemplate); - synchronized (getRootGroup(group.getId())) { - configureGroup(group, resourceGroupSpec); - } - } - - @Override - public boolean dynamicReloadSupported() - { - return true; - } - @Override public Optional> match(SelectionCriteria criteria) { @@ -191,12 +170,6 @@ public Optional> match(SelectionCriteria criteria) .findFirst(); } - @Override - public synchronized int getSpecVersion() - { - return version.get(); - } - @VisibleForTesting public List getSelectors() { @@ -223,45 +196,14 @@ public synchronized void load() ManagerSpec managerSpec = specsFromDb.getKey(); Map resourceGroupSpecs = specsFromDb.getValue(); Set changedSpecs = new HashSet<>(); - Set newSpecs = new HashSet<>(); - Set deletedSpecTemplates = Sets.difference(this.resourceGroupSpecs.keySet(), resourceGroupSpecs.keySet()); + Set deletedSpecs = Sets.difference(this.resourceGroupSpecs.keySet(), resourceGroupSpecs.keySet()); + for (Map.Entry entry : resourceGroupSpecs.entrySet()) { - if (!this.resourceGroupSpecs.containsKey(entry.getKey())) { - newSpecs.add(entry.getKey()); - } - else if (!entry.getValue().sameConfig(this.resourceGroupSpecs.get(entry.getKey()))) { + if (!entry.getValue().sameConfig(this.resourceGroupSpecs.get(entry.getKey()))) { changedSpecs.add(entry.getKey()); } } - int deletedTemplateSize = 0; - for (ResourceGroupIdTemplate deletedSpecTemplate : deletedSpecTemplates) { - ResourceGroupSpec deletedSpec = this.resourceGroupSpecs.get(deletedSpecTemplate); - //If the spec is already deleted, skip resetting it again - if (this.resourceGroupSpecs.get(deletedSpecTemplate).getHardConcurrencyLimit() > 0) { - ResourceGroupSpecBuilder specBuilder = new ResourceGroupSpecBuilder( - 0, - deletedSpec.getName(), - "0MB", - 0, - Optional.empty(), - 0, - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty(), - Optional.empty()); - resourceGroupSpecs.put(deletedSpecTemplate, specBuilder.build()); - deletedTemplateSize++; - } - } - - //Bump the version if changedSpecs, newSpecs or deletedSpecs has any changes: - if (!changedSpecs.isEmpty() || !newSpecs.isEmpty() || deletedTemplateSize > 0) { - version.getAndIncrement(); - } - this.resourceGroupSpecs = resourceGroupSpecs; this.cpuQuotaPeriod.set(managerSpec.getCpuQuotaPeriod()); this.rootGroups.set(managerSpec.getRootGroups()); @@ -276,13 +218,13 @@ else if (!entry.getValue().sameConfig(this.resourceGroupSpecs.get(entry.getKey() this.selectors.set(selectors); } + configureChangedGroups(changedSpecs); + disableDeletedGroups(deletedSpecs); + if (lastRefresh.get() > 0) { - if (deletedTemplateSize > 0) { - for (ResourceGroupIdTemplate deleted : deletedSpecTemplates) { - log.info("Resource group spec deleted %s", deleted); - } + for (ResourceGroupIdTemplate deleted : deletedSpecs) { + log.info("Resource group spec deleted %s", deleted); } - for (ResourceGroupIdTemplate changed : changedSpecs) { log.info("Resource group spec %s changed to %s", changed, resourceGroupSpecs.get(changed)); } diff --git a/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/TestingResourceGroup.java b/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/TestingResourceGroup.java index 13f45ad004afa..11bab68b5d646 100644 --- a/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/TestingResourceGroup.java +++ b/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/TestingResourceGroup.java @@ -166,10 +166,4 @@ public void setJmxExport(boolean export) { jmxExport = export; } - - @Override - public int getVersion() - { - return 0; - } } diff --git a/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/db/TestDbResourceGroupConfigurationManager.java b/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/db/TestDbResourceGroupConfigurationManager.java index 85e5a36d0b886..a059c4a12f930 100644 --- a/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/db/TestDbResourceGroupConfigurationManager.java +++ b/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/db/TestDbResourceGroupConfigurationManager.java @@ -84,7 +84,7 @@ public void testEnvironments() DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), prodEnvironment); List groups = manager.getRootGroups(); assertEquals(groups.size(), 1); - InternalResourceGroup prodGlobal = new InternalResourceGroup.RootInternalResourceGroup("prod_global", (group, export) -> {}, directExecutor(), 1); + InternalResourceGroup prodGlobal = new InternalResourceGroup.RootInternalResourceGroup("prod_global", (group, export) -> {}, directExecutor()); manager.configure(prodGlobal, new SelectionContext<>(prodGlobal.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); assertEqualsResourceGroup(prodGlobal, "10MB", 1000, 100, 100, WEIGHTED, DEFAULT_WEIGHT, true, new Duration(1, HOURS), new Duration(1, DAYS)); assertEquals(manager.getSelectors().size(), 1); @@ -95,7 +95,7 @@ public void testEnvironments() // check the dev configuration manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), devEnvironment); assertEquals(groups.size(), 1); - InternalResourceGroup devGlobal = new InternalResourceGroup.RootInternalResourceGroup("dev_global", (group, export) -> {}, directExecutor(), 1); + InternalResourceGroup devGlobal = new InternalResourceGroup.RootInternalResourceGroup("dev_global", (group, export) -> {}, directExecutor()); manager.configure(devGlobal, new SelectionContext<>(prodGlobal.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); assertEqualsResourceGroup(devGlobal, "1MB", 1000, 100, 100, WEIGHTED, DEFAULT_WEIGHT, true, new Duration(1, HOURS), new Duration(1, DAYS)); assertEquals(manager.getSelectors().size(), 1); @@ -118,11 +118,11 @@ public void testConfiguration() dao.insertSelector(2, 1, null, null, null, null, null); DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT); AtomicBoolean exported = new AtomicBoolean(); - InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor(), 1); + InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor()); manager.configure(global, new SelectionContext<>(global.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); assertEqualsResourceGroup(global, "1MB", 1000, 100, 100, WEIGHTED, DEFAULT_WEIGHT, true, new Duration(1, HOURS), new Duration(1, DAYS)); exported.set(false); - InternalResourceGroup sub = global.getOrCreateSubGroup("sub", true, 0); + InternalResourceGroup sub = global.getOrCreateSubGroup("sub", true); manager.configure(sub, new SelectionContext<>(sub.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); assertEqualsResourceGroup(sub, "2MB", 4, 3, 3, FAIR, 5, false, new Duration(Long.MAX_VALUE, MILLISECONDS), new Duration(Long.MAX_VALUE, MILLISECONDS)); } @@ -178,10 +178,48 @@ public void testMissing() dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h"); dao.insertSelector(2, 1, null, null, null, null, null); DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT); - InternalResourceGroup missing = new InternalResourceGroup.RootInternalResourceGroup("missing", (group, export) -> {}, directExecutor(), 1); + InternalResourceGroup missing = new InternalResourceGroup.RootInternalResourceGroup("missing", (group, export) -> {}, directExecutor()); manager.configure(missing, new SelectionContext<>(missing.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); } + @Test(timeOut = 60_000) + public void testReconfig() + throws Exception + { + H2DaoProvider daoProvider = setup("test_reconfig"); + H2ResourceGroupsDao dao = daoProvider.get(); + dao.createResourceGroupsGlobalPropertiesTable(); + dao.createResourceGroupsTable(); + dao.createSelectorsTable(); + dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", null, ENVIRONMENT); + dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, 1L, ENVIRONMENT); + dao.insertSelector(2, 1, null, null, null, null, null); + dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h"); + DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT); + manager.start(); + AtomicBoolean exported = new AtomicBoolean(); + InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor()); + manager.configure(global, new SelectionContext<>(global.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); + InternalResourceGroup globalSub = global.getOrCreateSubGroup("sub", true); + manager.configure(globalSub, new SelectionContext<>(globalSub.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); + // Verify record exists + assertEqualsResourceGroup(globalSub, "2MB", 4, 3, 3, FAIR, 5, false, new Duration(Long.MAX_VALUE, MILLISECONDS), new Duration(Long.MAX_VALUE, MILLISECONDS)); + dao.updateResourceGroup(2, "sub", "3MB", 2, 1, 1, "weighted", 6, true, "1h", "1d", 1L, ENVIRONMENT); + do { + MILLISECONDS.sleep(500); + } + while (globalSub.getJmxExport() == false); + // Verify update + assertEqualsResourceGroup(globalSub, "3MB", 2, 1, 1, WEIGHTED, 6, true, new Duration(1, HOURS), new Duration(1, DAYS)); + // Verify delete + dao.deleteSelectors(2); + dao.deleteResourceGroup(2); + do { + MILLISECONDS.sleep(500); + } + while (globalSub.getMaxQueuedQueries() != 0 || globalSub.getHardConcurrencyLimit() != 0); + } + @Test public void testExactMatchSelector() { diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index bf92a29ec0813..6073cc0ded095 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -41,7 +41,6 @@ import com.facebook.presto.execution.DataDefinitionTask; import com.facebook.presto.execution.ExecutionFailureInfo; import com.facebook.presto.execution.ExplainAnalyzeContext; -import com.facebook.presto.execution.ForQueryScheduling; import com.facebook.presto.execution.QueryIdGenerator; import com.facebook.presto.execution.QueryManager; import com.facebook.presto.execution.QueryManagerConfig; @@ -295,7 +294,6 @@ protected void setup(Binder binder) ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("presto-spark-executor-%s")); binder.bind(Executor.class).toInstance(executor); binder.bind(ExecutorService.class).toInstance(executor); - binder.bind(ExecutorService.class).annotatedWith(ForQueryScheduling.class).toInstance(executor); binder.bind(ScheduledExecutorService.class).toInstance(newScheduledThreadPool(0, daemonThreadsNamed("presto-spark-scheduled-executor-%s"))); // task executor diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroup.java b/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroup.java index 6bf7a4702ba4c..ff53d66b7b5e0 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroup.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroup.java @@ -95,6 +95,4 @@ public interface ResourceGroup * Whether to export statistics about this group and allow configuration via JMX. */ void setJmxExport(boolean export); - - int getVersion(); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroupConfigurationManager.java b/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroupConfigurationManager.java index 663a660d97c67..eb9aedf6b6773 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroupConfigurationManager.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/resourceGroups/ResourceGroupConfigurationManager.java @@ -37,14 +37,8 @@ public interface ResourceGroupConfigurationManager */ void configure(ResourceGroup group, SelectionContext criteria); - void configure(ResourceGroup group); - - boolean dynamicReloadSupported(); - /** * This method is called for every query that is submitted, so it should be fast. */ Optional> match(SelectionCriteria criteria); - - int getSpecVersion(); } diff --git a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java index c4199984692f5..89b8544e44339 100644 --- a/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java +++ b/presto-tests/src/test/java/com/facebook/presto/execution/resourceGroups/db/TestQueuesDb.java @@ -16,7 +16,6 @@ import com.facebook.presto.Session; import com.facebook.presto.dispatcher.DispatchManager; import com.facebook.presto.execution.QueryManager; -import com.facebook.presto.execution.resourceGroups.InternalResourceGroup; import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager; import com.facebook.presto.resourceGroups.db.DbResourceGroupConfigurationManager; import com.facebook.presto.resourceGroups.db.H2ResourceGroupsDao; @@ -32,7 +31,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -65,7 +63,6 @@ import static com.facebook.presto.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -126,20 +123,12 @@ public void testBasic() // Update db to allow for 1 more running query in dashboard resource group dao.updateResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, null, null, null, null, null, 1L, TEST_ENVIRONMENT); dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, null, null, null, null, 3L, TEST_ENVIRONMENT); - SECONDS.sleep(1); - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - SECONDS.sleep(2); waitForQueryState(queryRunner, secondDashboardQuery, RUNNING); QueryId thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); waitForQueryState(queryRunner, thirdDashboardQuery, QUEUED); waitForRunningQueryCount(queryRunner, 2); - // submit first non "dashboard" query QueryId firstNonDashboardQuery = createQuery(queryRunner, adhocSession(), LONG_LASTING_QUERY); - SECONDS.sleep(1); // wait for the first non "dashboard" query to start waitForQueryState(queryRunner, firstNonDashboardQuery, RUNNING); waitForRunningQueryCount(queryRunner, 3); @@ -151,7 +140,6 @@ public void testBasic() // cancel first "dashboard" query, the second "dashboard" query and second non "dashboard" query should start running cancelQuery(queryRunner, firstDashboardQuery); waitForQueryState(queryRunner, firstDashboardQuery, FAILED); - SECONDS.sleep(5); waitForQueryState(queryRunner, thirdDashboardQuery, RUNNING); waitForRunningQueryCount(queryRunner, 4); waitForCompleteQueryCount(queryRunner, 1); @@ -180,17 +168,15 @@ public void testTooManyQueries() QueryId thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); waitForQueryState(queryRunner, thirdDashboardQuery, FAILED); - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - // Allow one more query to run and resubmit third query dao.updateResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, null, null, null, null, null, 1L, TEST_ENVIRONMENT); dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, null, null, null, null, 3L, TEST_ENVIRONMENT); + InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); + DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); + // Trigger reload to make the test more deterministic dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - waitForQueryState(queryRunner, secondDashboardQuery, RUNNING); thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); waitForQueryState(queryRunner, thirdDashboardQuery, QUEUED); @@ -198,7 +184,6 @@ public void testTooManyQueries() // Lower running queries in dashboard resource groups and reload the config dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 1, null, null, null, null, null, 3L, TEST_ENVIRONMENT); dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); // Cancel query and verify that third query is still queued cancelQuery(queryRunner, firstDashboardQuery); @@ -296,7 +281,6 @@ public void testQueryExecutionTimeLimit() // set max running queries to 0 for the dashboard resource group so that new queries get queued immediately dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, null, 0, null, null, null, null, null, 3L, TEST_ENVIRONMENT); dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); QueryId secondQuery = createQuery( queryRunner, testSessionBuilder() @@ -315,7 +299,6 @@ public void testQueryExecutionTimeLimit() // reconfigure the resource group to run the second query dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, null, 1, null, null, null, null, null, 3L, TEST_ENVIRONMENT); dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); // cancel the first one and let the second one start dispatchManager.cancelQuery(firstQuery); // wait until the second one is FAILED @@ -354,13 +337,13 @@ public void testNonLeafGroup() .setSchema("sf100000") .setSource("non-leaf") .build(); + QueryManager queryManager = queryRunner.getCoordinator().getQueryManager(); InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); int originalSize = getSelectors(queryRunner).size(); // Add a selector for a non leaf group dao.insertSelector(3, 100, "user.*", "(?i).*non-leaf.*", null, null, null); dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); while (getSelectors(queryRunner).size() != originalSize + 1) { MILLISECONDS.sleep(500); } @@ -375,163 +358,6 @@ public void testNonLeafGroup() assertEquals(queryRunner.getCoordinator().getDispatchManager().getQueryInfo(invalidResourceGroupQuery).getErrorCode(), INVALID_RESOURCE_GROUP.toErrorCode()); } - @Test - public void testNonLeafToLeafTransition() - throws Exception - { - Session session = testSessionBuilder() - .setCatalog("tpch") - .setSchema("sf100000") - .setSource("leaf") - .build(); - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - // Submit query with side effect of creating resource groups - QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, firstDashboardQuery, RUNNING); - cancelQuery(queryRunner, firstDashboardQuery); - waitForQueryState(queryRunner, firstDashboardQuery, FAILED); - //Removing leaf resource group and making non leaf resource group a leaf one - dao.deleteSelectors(4); - dao.deleteSelectors(5); - dao.deleteResourceGroup(4); - dao.deleteResourceGroup(5); - // Add a selector for a non leaf group - dao.insertSelector(3, 100, "user.*", "(?i).*leaf.*", null, null, null); - dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - // Submit a query to a leaf resource group - QueryId leafResourceGroupQuery = createQuery(queryRunner, session, LONG_LASTING_QUERY); - waitForQueryState(queryRunner, leafResourceGroupQuery, RUNNING); - } - - @Test - public void testNonLeafToLeafTransitionShouldBumpSpecVersionOnlyOnce() - throws Exception - { - Session session = testSessionBuilder() - .setCatalog("tpch") - .setSchema("sf100000") - .setSource("leaf") - .build(); - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - // Submit query with side effect of creating resource groups - QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, firstDashboardQuery, RUNNING); - cancelQuery(queryRunner, firstDashboardQuery); - waitForQueryState(queryRunner, firstDashboardQuery, FAILED); - //Removing leaf resource group and making non leaf resource group a leaf one - dao.deleteSelectors(4); - dao.deleteSelectors(5); - dao.deleteResourceGroup(4); - dao.deleteResourceGroup(5); - // Add a selector for a non leaf group - dao.insertSelector(3, 100, "user.*", "(?i).*leaf.*", null, null, null); - assertEquals(dbConfigurationManager.getSpecVersion(), 1); - dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - assertEquals(dbConfigurationManager.getSpecVersion(), 2); - dbConfigurationManager.load(); - assertEquals(dbConfigurationManager.getSpecVersion(), 2); - } - - @Test - public void testLeafToNonLeafTransition() - throws Exception - { - Session leafSession = testSessionBuilder() - .setCatalog("tpch") - .setSchema("sf100000") - .setSource("leaf") - .build(); - - QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, firstDashboardQuery, RUNNING); - - dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 2, null, 2, null, null, null, null, null, 3L, TEST_ENVIRONMENT); - dao.insertResourceGroup(8, "dashboard-${USER}-leaf", "1MB", 1, 1, 1, null, null, null, null, null, 5L, TEST_ENVIRONMENT); - dao.insertSelector(8, 100, "user.*", "(?i).*leaf.*", null, null, null); - - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - - dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - - QueryId leafQuery = createQuery(queryRunner, leafSession, LONG_LASTING_QUERY); - waitForQueryState(queryRunner, leafQuery, RUNNING); - - QueryId secondDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, secondDashboardQuery, FAILED); - assertEquals(queryRunner.getCoordinator().getDispatchManager().getFullQueryInfo(secondDashboardQuery).getErrorCode(), INVALID_RESOURCE_GROUP.toErrorCode()); - } - - @Test - public void testResourceGroupVersioning() - throws Exception - { - QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, firstDashboardQuery, RUNNING); - - QueryId secondDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, secondDashboardQuery, QUEUED); - - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - - dao.updateResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, null, null, null, null, null, 1L, TEST_ENVIRONMENT); - dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, null, null, null, null, 3L, TEST_ENVIRONMENT); - - // Trigger reload to make the test more deterministic - dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - - //Previously queued query start running due to new version being introduced with higher concurrency - waitForQueryState(queryRunner, secondDashboardQuery, RUNNING); - - QueryId thirdDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, thirdDashboardQuery, QUEUED); - - QueryId forthDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, forthDashboardQuery, FAILED); - } - - @Test - public void testOldResourceGroupVersionCleanup() - throws Exception - { - QueryId firstDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, firstDashboardQuery, RUNNING); - - QueryId secondDashboardQuery = createQuery(queryRunner, dashboardSession(), LONG_LASTING_QUERY); - waitForQueryState(queryRunner, secondDashboardQuery, QUEUED); - - cancelQuery(queryRunner, firstDashboardQuery); - waitForQueryState(queryRunner, firstDashboardQuery, FAILED); - - cancelQuery(queryRunner, secondDashboardQuery); - waitForQueryState(queryRunner, secondDashboardQuery, FAILED); - - dao.updateResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, null, null, null, null, null, 1L, TEST_ENVIRONMENT); - dao.updateResourceGroup(5, "dashboard-${USER}", "1MB", 1, 2, 2, null, null, null, null, null, 3L, TEST_ENVIRONMENT); - - InternalResourceGroupManager manager = queryRunner.getCoordinator().getResourceGroupManager().get(); - DbResourceGroupConfigurationManager dbConfigurationManager = (DbResourceGroupConfigurationManager) manager.getConfigurationManager(); - - // Trigger reload to make the test more deterministic - dbConfigurationManager.load(); - manager.refreshInternalResourceGroups(); - - List rootGroups = manager.getRootGroups(); - - for (InternalResourceGroup.RootInternalResourceGroup rootGroup : rootGroups) { - assertEquals(rootGroup.subGroups().size(), 0); - assertEquals(rootGroup.getEligibleSubGroupsCount(), 0); - assertEquals(rootGroup.getDirtySubGroupsCount(), 0); - } - } - private void assertResourceGroupWithClientTags(Set clientTags, ResourceGroupId expectedResourceGroup) throws InterruptedException {