Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.memory.ClusterMemoryPoolManager;
import com.facebook.presto.spi.resourceGroups.ResourceGroup;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManager;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
Expand Down Expand Up @@ -62,7 +63,10 @@
import java.util.function.Supplier;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.execution.resourceGroups.LegacyResourceGroupConfigurationManager.HARD_CONCURRENCY_LIMIT;
import static com.facebook.presto.execution.resourceGroups.LegacyResourceGroupConfigurationManager.MAX_QUEUED_QUERIES;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.facebook.presto.spi.StandardErrorCode.SERVER_STARTING_UP;
import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -89,7 +93,7 @@ public final class InternalResourceGroupManager<C>
private final ConcurrentMap<ResourceGroupId, InternalResourceGroup> groups = new ConcurrentHashMap<>();
private final AtomicReference<ResourceGroupConfigurationManager<C>> configurationManager;
private final ResourceGroupConfigurationManagerContext configurationManagerContext;
private final ResourceGroupConfigurationManager<?> legacyManager;
private final ResourceGroupConfigurationManager<?> initializingConfigurationManager;
private final MBeanExporter exporter;
private final AtomicBoolean started = new AtomicBoolean();
private final AtomicLong lastCpuQuotaGenerationNanos = new AtomicLong(System.nanoTime());
Expand All @@ -104,28 +108,29 @@ public final class InternalResourceGroupManager<C>
private final double concurrencyThreshold;
private final Duration resourceGroupRuntimeInfoRefreshInterval;
private final boolean isResourceManagerEnabled;
private final QueryManagerConfig queryManagerConfig;

@Inject
public InternalResourceGroupManager(
LegacyResourceGroupConfigurationManager legacyManager,
ClusterMemoryPoolManager memoryPoolManager,
QueryManagerConfig queryManagerConfig,
NodeInfo nodeInfo,
MBeanExporter exporter,
ResourceGroupService resourceGroupService,
ServerConfig serverConfig)
{
requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.exporter = requireNonNull(exporter, "exporter is null");
this.configurationManagerContext = new ResourceGroupConfigurationManagerContextInstance(memoryPoolManager, nodeInfo.getEnvironment());
this.legacyManager = requireNonNull(legacyManager, "legacyManager is null");
this.configurationManager = new AtomicReference<>(cast(legacyManager));
this.initializingConfigurationManager = new InitializingConfigurationManager();
this.configurationManager = new AtomicReference(cast(initializingConfigurationManager));
this.maxTotalRunningTaskCountToNotExecuteNewQuery = queryManagerConfig.getMaxTotalRunningTaskCountToNotExecuteNewQuery();
this.resourceGroupService = requireNonNull(resourceGroupService, "resourceGroupService is null");
this.concurrencyThreshold = queryManagerConfig.getConcurrencyThresholdToEnableResourceGroupRefresh();
this.resourceGroupRuntimeInfoRefreshInterval = queryManagerConfig.getResourceGroupRunTimeInfoRefreshInterval();
this.isResourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
}

@Override
Expand Down Expand Up @@ -178,10 +183,15 @@ public void loadConfigurationManager()

setConfigurationManager(configurationManagerName, properties);
}
else {
Map<String, String> legacyProperties = ImmutableMap.of(
HARD_CONCURRENCY_LIMIT, Integer.toString(queryManagerConfig.getMaxConcurrentQueries()),
MAX_QUEUED_QUERIES, Integer.toString(queryManagerConfig.getMaxQueuedQueries()));
setConfigurationManager(LegacyResourceGroupConfigurationManager.NAME, legacyProperties);
}
}

@VisibleForTesting
public void setConfigurationManager(String name, Map<String, String> properties)
private void setConfigurationManager(String name, Map<String, String> properties)
{
requireNonNull(name, "name is null");
requireNonNull(properties, "properties is null");
Expand All @@ -192,7 +202,27 @@ public void setConfigurationManager(String name, Map<String, String> properties)
checkState(configurationManagerFactory != null, "Resource group configuration manager %s is not registered", name);

ResourceGroupConfigurationManager<C> configurationManager = cast(configurationManagerFactory.create(ImmutableMap.copyOf(properties), configurationManagerContext));
checkState(this.configurationManager.compareAndSet(cast(legacyManager), configurationManager), "configurationManager already set");
checkState(this.configurationManager.compareAndSet(cast(initializingConfigurationManager), configurationManager), "configurationManager already set");

log.info("-- Loaded resource group configuration manager %s --", name);
}

/**
* for use in testing to override the default configuration manager configured for the server
*/
@VisibleForTesting
public void forceSetConfigurationManager(String name, Map<String, String> properties)
{
requireNonNull(name, "name is null");
requireNonNull(properties, "properties is null");

log.info("-- Loading new resource group configuration manager --");

ResourceGroupConfigurationManagerFactory configurationManagerFactory = configurationManagerFactories.get(name);
checkState(configurationManagerFactory != null, "Resource group configuration manager %s is not registered", name);

ResourceGroupConfigurationManager<C> configurationManager = cast(configurationManagerFactory.create(ImmutableMap.copyOf(properties), configurationManagerContext));
this.configurationManager.set(configurationManager);

log.info("-- Loaded resource group configuration manager %s --", name);
}
Expand All @@ -202,7 +232,7 @@ public void setConfigurationManager(String name, Map<String, String> properties)
public ResourceGroupConfigurationManager<C> getConfigurationManager()
{
ResourceGroupConfigurationManager<C> manager = configurationManager.get();
checkState(manager != legacyManager, "cannot fetch legacy manager");
checkState(manager != initializingConfigurationManager, "cannot fetch initializing manager");
return manager;
}

Expand Down Expand Up @@ -473,4 +503,20 @@ private static <C> ResourceGroupConfigurationManager<C> cast(ResourceGroupConfig
{
return (ResourceGroupConfigurationManager<C>) manager;
}

private static class InitializingConfigurationManager
implements ResourceGroupConfigurationManager<Void>
{
@Override
public void configure(ResourceGroup group, SelectionContext criteria)
{
throw new PrestoException(SERVER_STARTING_UP, "Presto server is still initializing");
}

@Override
public Optional<SelectionContext<Void>> match(SelectionCriteria criteria)
{
throw new PrestoException(SERVER_STARTING_UP, "Presto server is still initializing");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@
*/
package com.facebook.presto.execution.resourceGroups;

import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.resourceGroups.LegacyResourceGroupConfigurationManager.VoidContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroup;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManager;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerContext;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;

import javax.inject.Inject;

import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class LegacyResourceGroupConfigurationManager
implements ResourceGroupConfigurationManager<VoidContext>
Expand All @@ -35,16 +36,41 @@ enum VoidContext
NONE
}

public static final String NAME = "legacy";
public static final String HARD_CONCURRENCY_LIMIT = "hard_concurrency_limit";
public static final String MAX_QUEUED_QUERIES = "max_queued_queries";

private static final ResourceGroupId GLOBAL = new ResourceGroupId("global");

private final int hardConcurrencyLimit;
private final int maxQueued;

@Inject
public LegacyResourceGroupConfigurationManager(QueryManagerConfig config)
public LegacyResourceGroupConfigurationManager(int hardConcurrencyLimit, int maxQueued)
{
checkArgument(hardConcurrencyLimit > 0, "hardConcurrencyLimit must be greater than 0");
checkArgument(maxQueued > 0, "maxQueued must be greater than 0");
this.hardConcurrencyLimit = hardConcurrencyLimit;
this.maxQueued = maxQueued;
}

public static class Factory
implements ResourceGroupConfigurationManagerFactory
{
hardConcurrencyLimit = config.getMaxConcurrentQueries();
maxQueued = config.getMaxQueuedQueries();
@Override
public String getName()
{
return NAME;
}

@Override
public ResourceGroupConfigurationManager<?> create(Map<String, String> config, ResourceGroupConfigurationManagerContext context)
{
String hardConcurrencyLimitString = requireNonNull(config.get(HARD_CONCURRENCY_LIMIT), "LegacyResourceGroupConfigurationManager must have config hard_concurrency_liimt");
int hardConcurrencyLimit = Integer.parseInt(hardConcurrencyLimitString);
String maxQueuedString = requireNonNull(config.get(MAX_QUEUED_QUERIES), "LegacyResourceGroupConfigurationManager must have config max_queued_queries");
int maxQueued = Integer.parseInt(maxQueuedString);
return new LegacyResourceGroupConfigurationManager(hardConcurrencyLimit, maxQueued);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager;
import com.facebook.presto.execution.resourceGroups.LegacyResourceGroupConfigurationManager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.execution.scheduler.AdaptivePhasedExecutionPolicy;
import com.facebook.presto.execution.scheduler.AllAtOnceExecutionPolicy;
Expand Down Expand Up @@ -181,7 +180,6 @@ protected void setup(Binder binder)
binder.bind(InternalResourceGroupManager.class).in(Scopes.SINGLETON);
newExporter(binder).export(InternalResourceGroupManager.class).withGeneratedName();
binder.bind(ResourceGroupManager.class).to(InternalResourceGroupManager.class);
binder.bind(LegacyResourceGroupConfigurationManager.class).in(Scopes.SINGLETON);
binder.bind(RetryCircuitBreaker.class).in(Scopes.SINGLETON);
newExporter(binder).export(RetryCircuitBreaker.class).withGeneratedName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ public TestingPrestoServer(
this.resourceGroupManager = resourceGroupManager instanceof InternalResourceGroupManager
? Optional.of((InternalResourceGroupManager<?>) resourceGroupManager)
: Optional.empty();
resourceGroupManager.loadConfigurationManager();
nodePartitioningManager = injector.getInstance(NodePartitioningManager.class);
planOptimizerManager = injector.getInstance(ConnectorPlanOptimizerManager.class);
clusterMemoryManager = injector.getInstance(ClusterMemoryManager.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.execution.resourceGroups;

import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.execution.MockManagedQueryExecution;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.testing.TestingMBeanServer;

public class TestInternalResourceGroupManager
{
@Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = ".*Presto server is still initializing.*")
public void testQueryFailsWithInitializingConfigurationManager()
{
InternalResourceGroupManager<ImmutableMap<Object, Object>> internalResourceGroupManager = new InternalResourceGroupManager<>((poolId, listener) -> {},
new QueryManagerConfig(), new NodeInfo("test"), new MBeanExporter(new TestingMBeanServer()), () -> null, new ServerConfig());
internalResourceGroupManager.submit(new MockManagedQueryExecution(0), new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), command -> {});
}

@Test
public void testQuerySucceedsWhenConfigurationManagerLoaded()
throws Exception
{
InternalResourceGroupManager<ImmutableMap<Object, Object>> internalResourceGroupManager = new InternalResourceGroupManager<>((poolId, listener) -> {},
new QueryManagerConfig(), new NodeInfo("test"), new MBeanExporter(new TestingMBeanServer()), () -> null, new ServerConfig());
internalResourceGroupManager.loadConfigurationManager();
internalResourceGroupManager.submit(new MockManagedQueryExecution(0), new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), command -> {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import com.facebook.presto.execution.executor.MultilevelSplitQueue;
import com.facebook.presto.execution.executor.TaskExecutor;
import com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager;
import com.facebook.presto.execution.resourceGroups.LegacyResourceGroupConfigurationManager;
import com.facebook.presto.execution.resourceGroups.ResourceGroupManager;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.execution.scheduler.nodeSelection.SimpleTtlNodeSelectorConfig;
Expand Down Expand Up @@ -485,7 +484,6 @@ protected void setup(Binder binder)
binder.bind(InternalResourceGroupManager.class).in(Scopes.SINGLETON);
binder.bind(ResourceGroupManager.class).to(InternalResourceGroupManager.class);
binder.bind(new TypeLiteral<ResourceGroupManager<?>>() {}).to(new TypeLiteral<InternalResourceGroupManager<?>>() {});
binder.bind(LegacyResourceGroupConfigurationManager.class).in(Scopes.SINGLETON);
binder.bind(ClusterMemoryPoolManager.class).toInstance(((poolId, listener) -> {}));
binder.bind(QueryPrerequisitesManager.class).in(Scopes.SINGLETON);
binder.bind(ResourceGroupService.class).to(NoopResourceGroupService.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void setUp()
queryRunner.installPlugin(new ResourceGroupManagerPlugin());
queryRunner.createCatalog("tpch", "tpch", ImmutableMap.of("tpch.splits-per-node", Integer.toString(SPLITS_PER_NODE)));
queryRunner.getCoordinator().getResourceGroupManager().get()
.setConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json")));
.forceSetConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json")));
}

private String getResourceFilePath(String fileName)
Expand Down
Loading