Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import static com.facebook.presto.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.PARTIAL_MERGE_PUSHDOWN_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
Expand Down Expand Up @@ -3355,37 +3354,32 @@ private void testGroupedExecution(Session session)
Session notColocated = Session.builder(session)
.setSystemProperty(COLOCATED_JOIN, "false")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "false")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "false")
.build();
// Co-located JOIN with all groups at once, fixed schedule
Session colocatedAllGroupsAtOnce = Session.builder(session)
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "0")
.setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "false")
.build();
// Co-located JOIN, 1 group per worker at a time, fixed schedule
Session colocatedOneGroupAtATime = Session.builder(session)
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "false")
.build();
// Co-located JOIN with all groups at once, dynamic schedule
Session colocatedAllGroupsAtOnceDynamic = Session.builder(session)
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "0")
.setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true")
.build();
// Co-located JOIN, 1 group per worker at a time, dynamic schedule
Session colocatedOneGroupAtATimeDynamic = Session.builder(session)
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true")
.build();
Expand All @@ -3394,7 +3388,6 @@ private void testGroupedExecution(Session session)
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name())
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.build();

Expand All @@ -3403,17 +3396,10 @@ private void testGroupedExecution(Session session)
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name())
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true")
.build();

// Eligible table scans only, 1 group per worker at a time
Session eligibleTableScans = Session.builder(session)
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.build();

//
// HASH JOIN
// =========
Expand Down Expand Up @@ -3822,18 +3808,6 @@ private void testGroupedExecution(Session session)
assertQuery(notColocated, joinMismatchedBuckets, expectedJoinMismatchedBuckets);
assertQuery(colocatedAllGroupsAtOnce, joinMismatchedBuckets, expectedJoinMismatchedBuckets, assertRemoteExchangesCount(1));
assertQuery(colocatedOneGroupAtATime, joinMismatchedBuckets, expectedJoinMismatchedBuckets, assertRemoteExchangesCount(1));

//
// Eligible table scans only
// =============================
@Language("SQL") String tableScan = "SELECT key1, value1 FROM test_grouped_join1";
@Language("SQL") String expectedTableScan = "SELECT orderkey, comment FROM orders";

assertQuery(notColocated, tableScan, expectedTableScan);
assertQuery(eligibleTableScans, tableScan, expectedTableScan);

// Input connector returns non-empty TablePartitioning for unpartitioned table.
assertQuery(eligibleTableScans, "SELECT orderkey FROM tpch.tiny.orders", "SELECT orderkey FROM orders");
}
finally {
assertUpdate(session, "DROP TABLE IF EXISTS test_grouped_join1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import static com.facebook.presto.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS;
import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.MAX_STAGE_RETRIES;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
Expand Down Expand Up @@ -312,56 +311,6 @@ public void testInsertUnbucketedTableWithGroupedExecution(int writerConcurrency)
"DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_failure"));
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT)
public void testScanFilterProjectionOnlyQueryOnUnbucketedTable(int writerConcurrency)
throws Exception
{
testRecoverableGroupedExecution(
queryRunner,
writerConcurrency,
ImmutableList.of(
"CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table AS\n" +
"SELECT t.comment\n" +
"FROM orders\n" +
"CROSS JOIN UNNEST(REPEAT(comment, 10)) AS t (comment)"),
"CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table_success AS\n" +
"SELECT comment value1 FROM scan_filter_projection_only_query_on_unbucketed_table",
"CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table_failure AS\n" +
"SELECT comment value1 FROM scan_filter_projection_only_query_on_unbucketed_table",
15000 * 10,
ImmutableList.of(
"DROP TABLE IF EXISTS scan_filter_projection_only_query_on_unbucketed_table",
"DROP TABLE IF EXISTS scan_filter_projection_only_query_on_unbucketed_table_success",
"DROP TABLE IF EXISTS scan_filter_projection_only_query_on_unbucketed_table_failure"));
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT)
public void testUnionAll(int writerConcurrency)
throws Exception
{
testRecoverableGroupedExecution(
queryRunner,
writerConcurrency,
ImmutableList.of(
"CREATE TABLE test_union_all AS\n" +
"SELECT t.comment\n" +
"FROM orders\n" +
"CROSS JOIN UNNEST(REPEAT(comment, 10)) AS t (comment)"),
"CREATE TABLE test_union_all_success AS\n" +
"SELECT comment value1 FROM test_union_all " +
"UNION ALL " +
"SELECT comment value1 FROM test_union_all",
"CREATE TABLE test_union_all_failure AS\n" +
"SELECT comment value1 FROM test_union_all " +
"UNION ALL " +
"SELECT comment value1 FROM test_union_all",
30000 * 10,
ImmutableList.of(
"DROP TABLE IF EXISTS test_union_all",
"DROP TABLE IF EXISTS test_union_all_success",
"DROP TABLE IF EXISTS test_union_all_failure"));
}

@Test(invocationCount = INVOCATION_COUNT)
public void testCountOnUnbucketedTable()
throws Exception
Expand Down Expand Up @@ -495,7 +444,6 @@ private static Session createRecoverableSession(int writerConcurrency)
.setSystemProperty(RECOVERABLE_GROUPED_EXECUTION, "true")
.setSystemProperty(SCALE_WRITERS, "false")
.setSystemProperty(REDISTRIBUTE_WRITES, "false")
.setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true")
.setSystemProperty(TASK_WRITER_COUNT, Integer.toString(writerConcurrency))
.setSystemProperty(TASK_PARTITIONED_WRITER_COUNT, Integer.toString(writerConcurrency))
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public final class SystemSessionProperties
public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct";
public static final String GROUPED_EXECUTION_FOR_AGGREGATION = "grouped_execution_for_aggregation";
public static final String GROUPED_EXECUTION_FOR_JOIN = "grouped_execution_for_join";
public static final String GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS = "grouped_execution_for_eligible_table_scans";
public static final String GROUPED_EXECUTION = "grouped_execution";
public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution";
public static final String RECOVERABLE_GROUPED_EXECUTION = "recoverable_grouped_execution";
Expand Down Expand Up @@ -257,11 +256,6 @@ public SystemSessionProperties(
"Use grouped execution for foin when possible",
featuresConfig.isGroupedExecutionForJoinEnabled(),
false),
booleanProperty(
GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS,
"Experimental: Use grouped execution for eligible table scans",
featuresConfig.isGroupedExecutionForEligibleTableScansEnabled(),
false),
booleanProperty(
GROUPED_EXECUTION,
"Use grouped execution when possible",
Expand Down Expand Up @@ -906,11 +900,6 @@ public static boolean isGroupedExecutionForJoinEnabled(Session session)
return session.getSystemProperty(GROUPED_EXECUTION_FOR_JOIN, Boolean.class) && isGroupedExecutionEnabled(session);
}

public static boolean isGroupedExecutionForEligibleTableScansEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, Boolean.class) && isGroupedExecutionEnabled(session);
}

public static boolean isGroupedExecutionEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class FeaturesConfig
private boolean colocatedJoinsEnabled = true;
private boolean groupedExecutionForAggregationEnabled = true;
private boolean groupedExecutionForJoinEnabled = true;
private boolean groupedExecutionForEligibleTableScansEnabled = true;
private boolean groupedExecutionEnabled = true;
private boolean dynamicScheduleForGroupedExecution = true;
private boolean recoverableGroupedExecutionEnabled;
Expand Down Expand Up @@ -399,19 +398,6 @@ public FeaturesConfig setGroupedExecutionForJoinEnabled(boolean groupedExecution
return this;
}

public boolean isGroupedExecutionForEligibleTableScansEnabled()
{
return groupedExecutionForEligibleTableScansEnabled;
}

@Config("experimental.grouped-execution-for-eligible-table-scans-enabled")
@ConfigDescription("Experimental: Use grouped execution for eligible table scans")
public FeaturesConfig setGroupedExecutionForEligibleTableScansEnabled(boolean groupedExecutionForEligibleTableScansEnabled)
{
this.groupedExecutionForEligibleTableScansEnabled = groupedExecutionForEligibleTableScansEnabled;
return this;
}

public boolean isGroupedExecutionEnabled()
{
return groupedExecutionEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import static com.facebook.presto.SystemSessionProperties.isDynamicScheduleForGroupedExecution;
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForEligibleTableScansEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForJoinEnabled;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
Expand Down Expand Up @@ -1158,7 +1157,7 @@ public GroupedExecutionProperties visitTableScan(TableScanNode node, Void contex
else {
return new GroupedExecutionProperties(
true,
isGroupedExecutionForEligibleTableScansEnabled(session),
false,
ImmutableList.of(node.getId()),
partitionHandles.size(),
metadata.getConnectorCapabilities(session, node.getTable().getConnectorId()).contains(SUPPORTS_REWINDABLE_SPLIT_SOURCE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void testDefaults()
.setJoinMaxBroadcastTableSize(null)
.setGroupedExecutionForAggregationEnabled(true)
.setGroupedExecutionForJoinEnabled(true)
.setGroupedExecutionForEligibleTableScansEnabled(true)
.setDynamicScheduleForGroupedExecutionEnabled(true)
.setGroupedExecutionEnabled(true)
.setRecoverableGroupedExecutionEnabled(false)
Expand Down Expand Up @@ -168,7 +167,6 @@ public void testExplicitPropertyMappings()
.put("join-max-broadcast-table-size", "42GB")
.put("grouped-execution-for-aggregation-enabled", "false")
.put("grouped-execution-for-join-enabled", "false")
.put("experimental.grouped-execution-for-eligible-table-scans-enabled", "false")
.put("grouped-execution-enabled", "false")
.put("dynamic-schedule-for-grouped-execution", "false")
.put("recoverable-grouped-execution-enabled", "true")
Expand Down Expand Up @@ -253,7 +251,6 @@ public void testExplicitPropertyMappings()
.setJoinMaxBroadcastTableSize(new DataSize(42, GIGABYTE))
.setGroupedExecutionForAggregationEnabled(false)
.setGroupedExecutionForJoinEnabled(false)
.setGroupedExecutionForEligibleTableScansEnabled(false)
.setGroupedExecutionEnabled(false)
.setDynamicScheduleForGroupedExecutionEnabled(false)
.setRecoverableGroupedExecutionEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForAggregationEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForEligibleTableScansEnabled;
import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForJoinEnabled;
import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites;
Expand All @@ -49,8 +48,7 @@ public void verify(SparkContext sparkContext, Session session)
verify(!isDistributedSortEnabled(session), "distributed sort is not supported");
verify(getExchangeMaterializationStrategy(session) == NONE, "exchange materialization is not supported");
verify(getPartitioningProviderCatalog(session).equals(GlobalSystemConnector.NAME), "partitioning provider other that system is not supported");
verify(!isGroupedExecutionForEligibleTableScansEnabled(session) &&
!isGroupedExecutionForAggregationEnabled(session) &&
verify(!isGroupedExecutionForAggregationEnabled(session) &&
!isRecoverableGroupedExecutionEnabled(session) &&
!isDynamicScheduleForGroupedExecution(session) &&
!isGroupedExecutionForJoinEnabled(session) &&
Expand Down