diff --git a/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java b/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java index b72b4f8fba20d..529fe1e2cae30 100644 --- a/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java +++ b/presto-main/src/main/java/com/facebook/presto/dispatcher/DispatchManager.java @@ -24,6 +24,7 @@ import com.facebook.presto.execution.resourceGroups.ResourceGroupManager; import com.facebook.presto.execution.warnings.WarningCollectorFactory; import com.facebook.presto.metadata.SessionPropertyManager; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; import com.facebook.presto.security.AccessControl; import com.facebook.presto.server.BasicQueryInfo; @@ -127,7 +128,8 @@ public DispatchManager( QueryManagerConfig queryManagerConfig, DispatchExecutor dispatchExecutor, ClusterStatusSender clusterStatusSender, - SecurityConfig securityConfig) + SecurityConfig securityConfig, + Optional clusterQueryTrackerService) { this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null"); this.analyzerProvider = requireNonNull(analyzerProvider, "analyzerClient is null"); @@ -147,7 +149,7 @@ public DispatchManager( this.clusterStatusSender = requireNonNull(clusterStatusSender, "clusterStatusSender is null"); - this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor()); + this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor(), clusterQueryTrackerService); this.securityConfig = requireNonNull(securityConfig, "securityConfig is null"); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java index 99e15c9bc3c1e..e37d5c684ef61 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -16,6 +16,7 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.Session; import com.facebook.presto.execution.QueryTracker.TrackedQuery; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.resourceGroups.ResourceGroupQueryLimits; @@ -81,7 +82,9 @@ public class QueryTracker @GuardedBy("this") private ScheduledFuture backgroundTask; - public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor) + private final Optional clusterQueryTrackerService; + + public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorService queryManagementExecutor, Optional clusterQueryTrackerService) { requireNonNull(queryManagerConfig, "queryManagerConfig is null"); this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge(); @@ -91,6 +94,7 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount(); this.queryManagementExecutor = requireNonNull(queryManagementExecutor, "queryManagementExecutor is null"); + this.clusterQueryTrackerService = clusterQueryTrackerService; } public synchronized void start() @@ -280,6 +284,10 @@ void enforceTaskLimits() } } + if (clusterQueryTrackerService.isPresent()) { + totalRunningTaskCount = clusterQueryTrackerService.get().getRunningTaskCount(); + } + runningTaskCount.set(totalRunningTaskCount); int runningTaskCountAfterKills = totalRunningTaskCount; diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java index 3ffa23ea1deca..33a41186901c3 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java @@ -26,6 +26,7 @@ import com.facebook.presto.execution.StateMachine.StateChangeListener; import com.facebook.presto.execution.warnings.WarningCollectorFactory; import com.facebook.presto.memory.ClusterMemoryManager; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.QueryId; @@ -49,6 +50,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -97,7 +99,14 @@ public class SqlQueryManager private final HistoryBasedPlanStatisticsTracker historyBasedPlanStatisticsTracker; @Inject - public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMonitor, EmbedVersion embedVersion, QueryManagerConfig queryManagerConfig, WarningCollectorFactory warningCollectorFactory, HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager) + public SqlQueryManager( + ClusterMemoryManager memoryManager, + QueryMonitor queryMonitor, + EmbedVersion embedVersion, + QueryManagerConfig queryManagerConfig, + WarningCollectorFactory warningCollectorFactory, + HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager, + Optional clusterQueryTrackerService) { this.memoryManager = requireNonNull(memoryManager, "memoryManager is null"); this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); @@ -111,7 +120,7 @@ public SqlQueryManager(ClusterMemoryManager memoryManager, QueryMonitor queryMon this.queryManagementExecutor = Executors.newScheduledThreadPool(queryManagerConfig.getQueryManagerExecutorPoolSize(), threadsNamed("query-management-%s")); this.queryManagementExecutorMBean = new ThreadPoolExecutorMBean((ThreadPoolExecutor) queryManagementExecutor); - this.queryTracker = new QueryTracker<>(queryManagerConfig, queryManagementExecutor); + this.queryTracker = new QueryTracker<>(queryManagerConfig, queryManagementExecutor, clusterQueryTrackerService); requireNonNull(historyBasedPlanStatisticsManager, "historyBasedPlanStatisticsManager is null"); this.historyBasedPlanStatisticsTracker = historyBasedPlanStatisticsManager.getHistoryBasedPlanStatisticsTracker(); } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java new file mode 100644 index 0000000000000..69650289b06dd --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ClusterQueryTrackerService.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.drift.client.DriftClient; +import com.facebook.presto.util.PeriodicTaskExecutor; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Objects.requireNonNull; + +public class ClusterQueryTrackerService +{ + private final DriftClient resourceManagerClient; + private final ScheduledExecutorService executorService; + private final long runningTaskCountFetchIntervalMillis; + private AtomicInteger runningTaskCount; + private final PeriodicTaskExecutor runningTaskCountUpdater; + + @Inject + public ClusterQueryTrackerService( + @ForResourceManager DriftClient resourceManagerClient, + @ForResourceManager ScheduledExecutorService executorService, + ResourceManagerConfig resourceManagerConfig) + { + this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerClient is null"); + this.executorService = requireNonNull(executorService, "executorService is null"); + this.runningTaskCountFetchIntervalMillis = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getRunningTaskCountFetchInterval().toMillis(); + this.runningTaskCount = new AtomicInteger(0); + this.runningTaskCountUpdater = new PeriodicTaskExecutor(runningTaskCountFetchIntervalMillis, executorService, () -> updateRunningTaskCount()); + } + + @PostConstruct + public void init() + { + runningTaskCountUpdater.start(); + } + + @PreDestroy + public void stop() + { + runningTaskCountUpdater.stop(); + } + + public int getRunningTaskCount() + { + return runningTaskCount.get(); + } + + private void updateRunningTaskCount() + { + this.runningTaskCount.set(resourceManagerClient.get().getRunningTaskCount()); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java index 7d8092ec0794c..e0ada85b8187a 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClient.java @@ -42,4 +42,7 @@ List getResourceGroupInfo(String excludingNode) @ThriftMethod void resourceGroupRuntimeHeartbeat(String node, List resourceGroupRuntimeInfo); + + @ThriftMethod + int getRunningTaskCount(); } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java index dedaab25868b5..39def5f4bb33e 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStateProvider.java @@ -21,6 +21,7 @@ import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.server.BasicQueryStats; import com.facebook.presto.server.NodeStatus; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.memory.ClusterMemoryPoolInfo; @@ -49,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.facebook.presto.SystemSessionProperties.resourceOvercommit; @@ -261,6 +263,18 @@ public List getClusterQueries() .collect(toImmutableList()); } + public int getRunningTaskCount() + { + int runningTaskCount = nodeQueryStates.values().stream() + .map(CoordinatorQueriesState::getActiveQueries) + .flatMap(Collection::stream) + .map(Query::getBasicQueryInfo) + .filter(q -> q.getState() == RUNNING) + .map(BasicQueryInfo::getQueryStats) + .collect(Collectors.summingInt(BasicQueryStats::getRunningTasks)); + return runningTaskCount; + } + public Map getClusterMemoryPoolInfo() { return clusterMemoryPoolInfosSupplier.get(); diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java index 5264617e0f4ae..299afb5e0faab 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerConfig.java @@ -44,6 +44,8 @@ public class ResourceManagerConfig private Duration resourceGroupServiceCacheExpireInterval = new Duration(10, SECONDS); private Duration resourceGroupServiceCacheRefreshInterval = new Duration(1, SECONDS); + private Duration runningTaskCountFetchInterval = new Duration(1, SECONDS); + @MinDuration("1ms") public Duration getQueryExpirationTimeout() { @@ -251,4 +253,15 @@ public ResourceManagerConfig setResourceGroupServiceCacheRefreshInterval(Duratio this.resourceGroupServiceCacheRefreshInterval = resourceGroupServiceCacheRefreshInterval; return this; } + + public Duration getRunningTaskCountFetchInterval() + { + return runningTaskCountFetchInterval; + } + @Config("resource-manager.running-task-count-fetch-interval") + public ResourceManagerConfig setRunningTaskCountFetchInterval(Duration runningTaskCountFetchInterval) + { + this.runningTaskCountFetchInterval = runningTaskCountFetchInterval; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java index 5f8a29c3e3435..b79f1a2110521 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerServer.java @@ -87,4 +87,10 @@ public void resourceGroupRuntimeHeartbeat(String node, List clusterStateProvider.registerResourceGroupRuntimeHeartbeat(node, resourceGroupRuntimeInfos)); } + + @ThriftMethod + public ListenableFuture getRunningTaskCount() + { + return executor.submit(clusterStateProvider::getRunningTaskCount); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 4c6905c1b4080..8fd468ba9b00e 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -121,6 +121,7 @@ import com.facebook.presto.operator.TaskMemoryReservationSummary; import com.facebook.presto.operator.index.IndexJoinLookupStats; import com.facebook.presto.resourcemanager.ClusterMemoryManagerService; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; import com.facebook.presto.resourcemanager.ForResourceManager; import com.facebook.presto.resourcemanager.NoopResourceGroupService; @@ -401,6 +402,7 @@ else if (serverConfig.isCoordinator()) { addressSelectorBinder.bind(AddressSelector.class).annotatedWith(annotation).to(RandomCatalogServerAddressSelector.class)); newOptionalBinder(binder, ClusterMemoryManagerService.class); + newOptionalBinder(binder, ClusterQueryTrackerService.class); install(installModuleIf( ServerConfig.class, ServerConfig::isResourceManagerEnabled, @@ -413,6 +415,7 @@ public void configure(Binder moduleBinder) moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON); if (serverConfig.isCoordinator()) { moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON); + moduleBinder.bind(ClusterQueryTrackerService.class).in(Scopes.SINGLETON); moduleBinder.bind(ResourceGroupService.class).to(ResourceManagerResourceGroupService.class).in(Scopes.SINGLETON); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java index 77fe8115adb99..60f003d53e6d0 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryTrackerHighTaskCountKill.java @@ -13,9 +13,15 @@ */ package com.facebook.presto.execution; +import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; +import com.facebook.presto.resourcemanager.ResourceManagerClient; +import com.facebook.presto.resourcemanager.ResourceManagerConfig; +import com.facebook.presto.resourcemanager.TestingClusterQueryTrackerService; +import com.facebook.presto.resourcemanager.TestingResourceManagerClient; import com.facebook.presto.spi.PrestoException; import org.testng.annotations.Test; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; @@ -34,7 +40,7 @@ public void testMultipleQueriesKilledDueToTaskCount() .setMaxTotalRunningTaskCountToKillQuery(200); ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(); try { - QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService); + QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.empty()); MockQueryExecution smallQuery1 = MockQueryExecution.withRunningTaskCount(50); MockQueryExecution largeQueryButNotKilled = MockQueryExecution.withRunningTaskCount(101); MockQueryExecution largeQueryToBeKilled1 = MockQueryExecution.withRunningTaskCount(200); @@ -64,4 +70,33 @@ public void testMultipleQueriesKilledDueToTaskCount() scheduledExecutorService.shutdownNow(); } } + + @Test + public void testLargeQueryKilledDueToTaskCount_withClusterQueryTracker() + { + QueryManagerConfig config = new QueryManagerConfig() + .setMaxQueryRunningTaskCount(100) + .setMaxTotalRunningTaskCountToKillQuery(200); + ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(); + ResourceManagerClient resourceManagerClient = new TestingResourceManagerClient(); + ClusterQueryTrackerService clusterQueryTrackerService = new TestingClusterQueryTrackerService((addressSelectionContext, headers) -> resourceManagerClient, newSingleThreadScheduledExecutor(), new ResourceManagerConfig(), 201); + try { + QueryTracker queryTracker = new QueryTracker<>(config, scheduledExecutorService, Optional.of(clusterQueryTrackerService)); + MockQueryExecution smallQuery = MockQueryExecution.withRunningTaskCount(50); + MockQueryExecution largeQueryToBeKilled = MockQueryExecution.withRunningTaskCount(101); + + queryTracker.addQuery(smallQuery); + queryTracker.addQuery(largeQueryToBeKilled); + + queryTracker.enforceTaskLimits(); + + assertFalse(smallQuery.getFailureReason().isPresent(), "small query should not be killed"); + Throwable failureReason = largeQueryToBeKilled.getFailureReason().get(); + assertTrue(failureReason instanceof PrestoException); + assertEquals(((PrestoException) failureReason).getErrorCode(), QUERY_HAS_TOO_MANY_STAGES.toErrorCode()); + } + finally { + scheduledExecutorService.shutdownNow(); + } + } } diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java index 30200a3aadaa8..7b6933a9a1418 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStateProvider.java @@ -540,6 +540,34 @@ public void testShuttingDownCoordinatorHeartbeat() assertQueryInfos(provider.getClusterQueries(), 4, 3); } + @Test + public void testRunningTaskCount() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + nodeManager.addShuttingDownNode(new InternalNode("node1", URI.create("local://127.0.0.1"), NodeVersion.UNKNOWN, true)); + + ResourceManagerClusterStateProvider provider = new ResourceManagerClusterStateProvider(nodeManager, new SessionPropertyManager(), 10, Duration.valueOf("4s"), Duration.valueOf("8s"), Duration.valueOf("5s"), Duration.valueOf("0s"), true, newSingleThreadScheduledExecutor()); + + assertEquals(provider.getRunningTaskCount(), 0); + + long query1Sequence = 0; + long query2Sequence = 0; + long query3Sequence = 0; + long query4Sequence = 0; + + provider.registerQueryHeartbeat("node1", createQueryInfo("1", QUEUED), query1Sequence++); + assertEquals(provider.getRunningTaskCount(), 0); + + provider.registerQueryHeartbeat("node1", createQueryInfo("2", RUNNING), query2Sequence++); + assertEquals(provider.getRunningTaskCount(), 11); + + provider.registerQueryHeartbeat("node1", createQueryInfo("3", FINISHED), query3Sequence++); + assertEquals(provider.getRunningTaskCount(), 11); + + provider.registerQueryHeartbeat("node1", createQueryInfo("4", FAILED), query4Sequence++); + assertEquals(provider.getRunningTaskCount(), 11); + } + void assertWorkerMemoryInfo(ResourceManagerClusterStateProvider provider, int count) { Map workerMemoryInfo = provider.getWorkerMemoryInfo(); diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java index 60aad6b0ff240..b4d8c9ca34691 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerConfig.java @@ -47,7 +47,8 @@ public void testDefaults() .setResourceGroupServiceCacheEnabled(false) .setResourceGroupServiceCacheExpireInterval(new Duration(10, SECONDS)) .setResourceGroupServiceCacheRefreshInterval(new Duration(1, SECONDS)) - .setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS))); + .setResourceGroupRuntimeHeartbeatInterval(new Duration(1, SECONDS)) + .setRunningTaskCountFetchInterval(new Duration(1, SECONDS))); } @Test @@ -70,6 +71,7 @@ public void testExplicitPropertyMappings() .put("resource-manager.resource-group-service-cache-expire-interval", "1m") .put("resource-manager.resource-group-service-cache-refresh-interval", "10m") .put("resource-manager.resource-group-runtimeinfo-heartbeat-interval", "6m") + .put("resource-manager.running-task-count-fetch-interval", "1m") .build(); ResourceManagerConfig expected = new ResourceManagerConfig() @@ -88,7 +90,8 @@ public void testExplicitPropertyMappings() .setResourceGroupServiceCacheEnabled(true) .setResourceGroupServiceCacheExpireInterval(new Duration(1, MINUTES)) .setResourceGroupServiceCacheRefreshInterval(new Duration(10, MINUTES)) - .setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES)); + .setResourceGroupRuntimeHeartbeatInterval(new Duration(6, MINUTES)) + .setRunningTaskCountFetchInterval(new Duration(1, MINUTES)); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java new file mode 100644 index 0000000000000..a58bf8b29bd9f --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingClusterQueryTrackerService.java @@ -0,0 +1,37 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import com.facebook.drift.client.DriftClient; + +import java.util.concurrent.ScheduledExecutorService; + +public class TestingClusterQueryTrackerService + extends ClusterQueryTrackerService +{ + DriftClient resourceManagerClient; + int runningTaskCount; + + public TestingClusterQueryTrackerService(DriftClient resourceManagerClient, ScheduledExecutorService executorService, ResourceManagerConfig resourceManagerConfig, int runningTaskCount) + { + super(resourceManagerClient, executorService, resourceManagerConfig); + this.runningTaskCount = runningTaskCount; + } + + @Override + public int getRunningTaskCount() + { + return runningTaskCount; + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java index 0ebe6f6c2b32f..282a499a6dbbd 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestingResourceManagerClient.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -class TestingResourceManagerClient +public class TestingResourceManagerClient implements ResourceManagerClient { private final AtomicInteger queryHeartbeats = new AtomicInteger(); @@ -36,6 +36,8 @@ class TestingResourceManagerClient private volatile List resourceGroupRuntimeInfos = ImmutableList.of(); + private int runningTaskCount; + @Override public void queryHeartbeat(String internalNode, BasicQueryInfo basicQueryInfo, long sequenceId) { @@ -55,6 +57,11 @@ public void setResourceGroupRuntimeInfos(List resource this.resourceGroupRuntimeInfos = ImmutableList.copyOf(resourceGroupRuntimeInfos); } + public void setRunningTaskCount(int runningTaskCount) + { + this.runningTaskCount = runningTaskCount; + } + @Override public void nodeHeartbeat(NodeStatus nodeStatus) { @@ -92,4 +99,9 @@ public int getResourceGroupRuntimeHeartbeats() { return resourceGroupRuntimeHeartbeats.get(); } + + public int getRunningTaskCount() + { + return runningTaskCount; + } }