Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ private static long getSubGroupSchedulingPriority(SchedulingPolicy policy, Inter

private long computeSchedulingWeight()
{
if (runningQueries.size() + descendantRunningQueries >= softConcurrencyLimit) {
if (getAggregatedRunningQueries() >= softConcurrencyLimit) {
return schedulingWeight;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,28 @@ public static H2ResourceGroupsDao getDao(String url)
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), 1, false);
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), 1, false, false);
}

public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, int coordinatorCount)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), coordinatorCount, false);
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), coordinatorCount, false, false);
}

public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, Map<String, String> coordinatorProperties, int coordinatorCount)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, coordinatorProperties, coordinatorCount, false);
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, coordinatorProperties, coordinatorCount, false, false);
}

public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, Map<String, String> coordinatorProperties, int coordinatorCount, boolean weightedFairSchedulingEnabled)
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, Map<String, String> coordinatorProperties, int coordinatorCount, boolean weightedFairSchedulingEnabled, boolean weightedSchedulingEnabled)
throws Exception
{
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, coordinatorProperties, coordinatorCount, weightedFairSchedulingEnabled);
return createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, coordinatorProperties, coordinatorCount, weightedFairSchedulingEnabled, weightedSchedulingEnabled);
}

public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, String environment, Map<String, String> coordinatorProperties, int coordinatorCount, boolean weightedFairSchedulingEnabled)
public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2ResourceGroupsDao dao, String environment, Map<String, String> coordinatorProperties, int coordinatorCount, boolean weightedFairSchedulingEnabled, boolean weightedSchedulingEnabled)
throws Exception
{
DistributedQueryRunner queryRunner = DistributedQueryRunner
Expand All @@ -174,7 +174,7 @@ public static DistributedQueryRunner createQueryRunner(String dbConfigUrl, H2Res
}
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
setup(queryRunner, dao, environment, weightedFairSchedulingEnabled);
setup(queryRunner, dao, environment, weightedFairSchedulingEnabled, weightedSchedulingEnabled);
queryRunner.waitForClusterToGetReady();
return queryRunner;
}
Expand Down Expand Up @@ -236,12 +236,37 @@ private static void resourceGroupSetupWithWeightedFairPolicy(H2ResourceGroupsDao
dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null, null);
}

private static void setup(DistributedQueryRunner queryRunner, H2ResourceGroupsDao dao, String environment, boolean weightedFairSchedulingEnabled)
private static void resourceGroupSetupWithWeightedPolicy(H2ResourceGroupsDao dao)
{
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertResourceGroup(1, "global", "1MB", 100, 1000, 1000, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(2, "bi-${USER}", "1MB", 3, 2, 2, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(3, "user-${USER}", "1MB", 3, 4, 4, SchedulingPolicy.WEIGHTED.toString(), null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(4, "adhoc-${USER}", "1MB", 3, 3, 3, null, 10, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(5, "dashboard-${USER}", "1MB", 1, 1, 2, null, 1000, null, null, null, null, null, null, 3L, TEST_ENVIRONMENT);
dao.insertResourceGroup(6, "no-queueing", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT_2);
dao.insertResourceGroup(7, "explain", "1MB", 0, 1, 1, null, null, null, null, null, null, null, null, null, TEST_ENVIRONMENT);
dao.insertResourceGroup(8, "test", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, 1L, TEST_ENVIRONMENT);
dao.insertResourceGroup(9, "test-${USER}", "1MB", 3, 3, 3, null, null, null, null, null, null, null, null, 8L, TEST_ENVIRONMENT);
dao.insertSelector(2, 10_000, "user.*", "test", null, null, null, null);
dao.insertSelector(4, 1_000, "user.*", "(?i).*adhoc.*", null, null, null, null);
dao.insertSelector(5, 100, "user.*", "(?i).*dashboard.*", null, null, null, null);
dao.insertSelector(4, 10, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1", "tag2")), null, null);
dao.insertSelector(2, 1, "user.*", null, null, CLIENT_TAGS_CODEC.toJson(ImmutableList.of("tag1")), null, null);
dao.insertSelector(6, 6, ".*", ".*", null, null, null, null);
dao.insertSelector(7, 100_000, null, null, EXPLAIN.name(), null, null, null);
dao.insertSelector(9, 10_000, "user.*", "abc", null, null, null, null);
}

private static void setup(DistributedQueryRunner queryRunner, H2ResourceGroupsDao dao, String environment, boolean weightedFairSchedulingEnabled, boolean weightedSchedulingEnabled)
throws InterruptedException
{
if (weightedFairSchedulingEnabled) {
resourceGroupSetupWithWeightedFairPolicy(dao);
}
else if (weightedSchedulingEnabled) {
resourceGroupSetupWithWeightedPolicy(dao);
}
else {
resourceGroupSetup(dao);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeGroups;
import org.testng.annotations.Test;

import java.util.Map;
Expand All @@ -45,8 +45,8 @@ public class TestDistributedQueuesSchedulingPolicy
private static final String LONG_LASTING_QUERY = "SELECT COUNT(*) FROM lineitem";
private DistributedQueryRunner queryRunner;

@BeforeMethod
public void setup()
@BeforeGroups(groups = "weightedFairScheduling")
public void weightedFairSchedulingSetup()
throws Exception
{
String dbConfigUrl = getDbConfigUrl();
Expand All @@ -57,7 +57,22 @@ public void setup()
coordinatorProperties.put("resource-group-runtimeinfo-refresh-interval", "500ms");
coordinatorProperties.put("concurrency-threshold-to-enable-resource-group-refresh", "0");

queryRunner = createQueryRunner(dbConfigUrl, dao, coordinatorProperties.build(), 2, true);
queryRunner = createQueryRunner(dbConfigUrl, dao, coordinatorProperties.build(), 2, true, false);
}

@BeforeGroups(groups = "weightedScheduling")
public void weightedSchedulingSetup()
throws Exception
{
String dbConfigUrl = getDbConfigUrl();
H2ResourceGroupsDao dao = getDao(dbConfigUrl);
ImmutableMap.Builder<String, String> coordinatorProperties = new ImmutableMap.Builder<>();
coordinatorProperties.put("query-manager.experimental.required-coordinators", "2");
coordinatorProperties.put("resource-manager.query-heartbeat-interval", "10ms");
coordinatorProperties.put("resource-group-runtimeinfo-refresh-interval", "500ms");
coordinatorProperties.put("concurrency-threshold-to-enable-resource-group-refresh", "0");

queryRunner = createQueryRunner(dbConfigUrl, dao, coordinatorProperties.build(), 2, false, true);
}

@AfterMethod(alwaysRun = true)
Expand All @@ -67,8 +82,8 @@ public void tearDown()
queryRunner = null;
}

@Test(timeOut = 60_000)
public void test()
@Test(timeOut = 60_000, groups = "weightedFairScheduling")
public void testWeightedFairScheduling()
throws Exception
{
QueryId firstAdhocQuery = createQuery(queryRunner, 0, adhocSession(), LONG_LASTING_QUERY);
Expand Down Expand Up @@ -120,4 +135,58 @@ public void test()
cancelQuery(queryRunner, 0, firstDashboardQuery);
waitForQueryState(queryRunner, 1, thirdAdhocQuery, RUNNING);
}

@Test(timeOut = 60_000, groups = "weightedScheduling")
public void testWeightedScheduling()
throws Exception
{
QueryId firstAdhocQuery = createQuery(queryRunner, 0, adhocSession(), LONG_LASTING_QUERY);

QueryId secondAdhocQuery = createQuery(queryRunner, 1, adhocSession(), LONG_LASTING_QUERY);

QueryId firstDashboardQuery = createQuery(queryRunner, 0, dashboardSession(), LONG_LASTING_QUERY);

QueryId secondDashboardQuery = createQuery(queryRunner, 1, dashboardSession(), LONG_LASTING_QUERY);

waitForQueryState(queryRunner, 0, firstAdhocQuery, RUNNING);
waitForQueryState(queryRunner, 1, secondAdhocQuery, RUNNING);
waitForQueryState(queryRunner, 0, firstDashboardQuery, RUNNING);
waitForQueryState(queryRunner, 1, secondDashboardQuery, RUNNING);

Map<ResourceGroupId, ResourceGroupRuntimeInfo> resourceGroupRuntimeInfoSnapshot;
int globalRunningQueries = 0;
do {
MILLISECONDS.sleep(100);
globalRunningQueries = 0;
for (int coordinator = 0; coordinator < 2; coordinator++) {
resourceGroupRuntimeInfoSnapshot = queryRunner.getCoordinator(coordinator).getResourceGroupManager().get().getResourceGroupRuntimeInfosSnapshot();
ResourceGroupRuntimeInfo resourceGroupRuntimeInfo = resourceGroupRuntimeInfoSnapshot.get(new ResourceGroupId("global"));
if (resourceGroupRuntimeInfo != null) {
globalRunningQueries += resourceGroupRuntimeInfo.getDescendantRunningQueries();
}
}
} while (globalRunningQueries != 4);

QueryId thirdAdhocQuery = createQuery(queryRunner, 0, adhocSession(), LONG_LASTING_QUERY);
QueryId thirdDashboardQuery = createQuery(queryRunner, 0, dashboardSession(), LONG_LASTING_QUERY);

waitForQueryState(queryRunner, 0, thirdAdhocQuery, QUEUED);
waitForQueryState(queryRunner, 0, thirdDashboardQuery, QUEUED);

int globalQueuedQueries = 0;
do {
MILLISECONDS.sleep(100);
globalQueuedQueries = 0;
for (int coordinator = 0; coordinator < 2; coordinator++) {
resourceGroupRuntimeInfoSnapshot = queryRunner.getCoordinator(coordinator).getResourceGroupManager().get().getResourceGroupRuntimeInfosSnapshot();
ResourceGroupRuntimeInfo resourceGroupRuntimeInfo = resourceGroupRuntimeInfoSnapshot.get(new ResourceGroupId("global"));
if (resourceGroupRuntimeInfo != null) {
globalQueuedQueries += resourceGroupRuntimeInfo.getDescendantQueuedQueries();
}
}
} while (globalQueuedQueries != 2);

cancelQuery(queryRunner, 1, secondDashboardQuery);
waitForQueryState(queryRunner, 0, thirdAdhocQuery, RUNNING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testEnvironment1()
{
String dbConfigUrl = getDbConfigUrl();
H2ResourceGroupsDao dao = getDao(dbConfigUrl);
try (DistributedQueryRunner runner = createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), 1, false)) {
try (DistributedQueryRunner runner = createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT, ImmutableMap.of(), 1, false, false)) {
QueryId firstQuery = createQuery(runner, adhocSession(), LONG_LASTING_QUERY);
waitForQueryState(runner, firstQuery, RUNNING);
QueryId secondQuery = createQuery(runner, adhocSession(), LONG_LASTING_QUERY);
Expand All @@ -55,7 +55,7 @@ public void testEnvironment2()
{
String dbConfigUrl = getDbConfigUrl();
H2ResourceGroupsDao dao = getDao(dbConfigUrl);
try (DistributedQueryRunner runner = createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT_2, ImmutableMap.of(), 1, false)) {
try (DistributedQueryRunner runner = createQueryRunner(dbConfigUrl, dao, TEST_ENVIRONMENT_2, ImmutableMap.of(), 1, false, false)) {
QueryId firstQuery = createQuery(runner, adhocSession(), LONG_LASTING_QUERY);
waitForQueryState(runner, firstQuery, RUNNING);
QueryId secondQuery = createQuery(runner, adhocSession(), LONG_LASTING_QUERY);
Expand Down