diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index c26f74526ea71..c3235ce868433 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -77,7 +77,6 @@ import static com.facebook.presto.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION; import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION; -import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS; import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.PARTIAL_MERGE_PUSHDOWN_STRATEGY; import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG; @@ -3355,13 +3354,11 @@ private void testGroupedExecution(Session session) Session notColocated = Session.builder(session) .setSystemProperty(COLOCATED_JOIN, "false") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "false") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "false") .build(); // Co-located JOIN with all groups at once, fixed schedule Session colocatedAllGroupsAtOnce = Session.builder(session) .setSystemProperty(COLOCATED_JOIN, "true") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "0") .setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "false") .build(); @@ -3369,7 +3366,6 @@ private void testGroupedExecution(Session session) Session colocatedOneGroupAtATime = Session.builder(session) .setSystemProperty(COLOCATED_JOIN, "true") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1") .setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "false") .build(); @@ -3377,7 +3373,6 @@ private void testGroupedExecution(Session session) Session colocatedAllGroupsAtOnceDynamic = Session.builder(session) .setSystemProperty(COLOCATED_JOIN, "true") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "0") .setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true") .build(); @@ -3385,7 +3380,6 @@ private void testGroupedExecution(Session session) Session colocatedOneGroupAtATimeDynamic = Session.builder(session) .setSystemProperty(COLOCATED_JOIN, "true") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1") .setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true") .build(); @@ -3394,7 +3388,6 @@ private void testGroupedExecution(Session session) .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) .setSystemProperty(COLOCATED_JOIN, "true") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1") .build(); @@ -3403,17 +3396,10 @@ private void testGroupedExecution(Session session) .setSystemProperty(JOIN_DISTRIBUTION_TYPE, BROADCAST.name()) .setSystemProperty(COLOCATED_JOIN, "true") .setSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, "true") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1") .setSystemProperty(DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION, "true") .build(); - // Eligible table scans only, 1 group per worker at a time - Session eligibleTableScans = Session.builder(session) - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") - .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1") - .build(); - // // HASH JOIN // ========= @@ -3822,18 +3808,6 @@ private void testGroupedExecution(Session session) assertQuery(notColocated, joinMismatchedBuckets, expectedJoinMismatchedBuckets); assertQuery(colocatedAllGroupsAtOnce, joinMismatchedBuckets, expectedJoinMismatchedBuckets, assertRemoteExchangesCount(1)); assertQuery(colocatedOneGroupAtATime, joinMismatchedBuckets, expectedJoinMismatchedBuckets, assertRemoteExchangesCount(1)); - - // - // Eligible table scans only - // ============================= - @Language("SQL") String tableScan = "SELECT key1, value1 FROM test_grouped_join1"; - @Language("SQL") String expectedTableScan = "SELECT orderkey, comment FROM orders"; - - assertQuery(notColocated, tableScan, expectedTableScan); - assertQuery(eligibleTableScans, tableScan, expectedTableScan); - - // Input connector returns non-empty TablePartitioning for unpartitioned table. - assertQuery(eligibleTableScans, "SELECT orderkey FROM tpch.tiny.orders", "SELECT orderkey FROM orders"); } finally { assertUpdate(session, "DROP TABLE IF EXISTS test_grouped_join1"); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java index d62288513e1f5..7afd10c95afb9 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveRecoverableExecution.java @@ -43,7 +43,6 @@ import static com.facebook.presto.SystemSessionProperties.DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION; import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_AGGREGATION; -import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS; import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT; import static com.facebook.presto.SystemSessionProperties.MAX_STAGE_RETRIES; import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG; @@ -312,56 +311,6 @@ public void testInsertUnbucketedTableWithGroupedExecution(int writerConcurrency) "DROP TABLE IF EXISTS insert_unbucketed_table_with_grouped_execution_failure")); } - @Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT) - public void testScanFilterProjectionOnlyQueryOnUnbucketedTable(int writerConcurrency) - throws Exception - { - testRecoverableGroupedExecution( - queryRunner, - writerConcurrency, - ImmutableList.of( - "CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table AS\n" + - "SELECT t.comment\n" + - "FROM orders\n" + - "CROSS JOIN UNNEST(REPEAT(comment, 10)) AS t (comment)"), - "CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table_success AS\n" + - "SELECT comment value1 FROM scan_filter_projection_only_query_on_unbucketed_table", - "CREATE TABLE scan_filter_projection_only_query_on_unbucketed_table_failure AS\n" + - "SELECT comment value1 FROM scan_filter_projection_only_query_on_unbucketed_table", - 15000 * 10, - ImmutableList.of( - "DROP TABLE IF EXISTS scan_filter_projection_only_query_on_unbucketed_table", - "DROP TABLE IF EXISTS scan_filter_projection_only_query_on_unbucketed_table_success", - "DROP TABLE IF EXISTS scan_filter_projection_only_query_on_unbucketed_table_failure")); - } - - @Test(timeOut = TEST_TIMEOUT, dataProvider = "writerConcurrency", invocationCount = INVOCATION_COUNT) - public void testUnionAll(int writerConcurrency) - throws Exception - { - testRecoverableGroupedExecution( - queryRunner, - writerConcurrency, - ImmutableList.of( - "CREATE TABLE test_union_all AS\n" + - "SELECT t.comment\n" + - "FROM orders\n" + - "CROSS JOIN UNNEST(REPEAT(comment, 10)) AS t (comment)"), - "CREATE TABLE test_union_all_success AS\n" + - "SELECT comment value1 FROM test_union_all " + - "UNION ALL " + - "SELECT comment value1 FROM test_union_all", - "CREATE TABLE test_union_all_failure AS\n" + - "SELECT comment value1 FROM test_union_all " + - "UNION ALL " + - "SELECT comment value1 FROM test_union_all", - 30000 * 10, - ImmutableList.of( - "DROP TABLE IF EXISTS test_union_all", - "DROP TABLE IF EXISTS test_union_all_success", - "DROP TABLE IF EXISTS test_union_all_failure")); - } - @Test(invocationCount = INVOCATION_COUNT) public void testCountOnUnbucketedTable() throws Exception @@ -495,7 +444,6 @@ private static Session createRecoverableSession(int writerConcurrency) .setSystemProperty(RECOVERABLE_GROUPED_EXECUTION, "true") .setSystemProperty(SCALE_WRITERS, "false") .setSystemProperty(REDISTRIBUTE_WRITES, "false") - .setSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, "true") .setSystemProperty(TASK_WRITER_COUNT, Integer.toString(writerConcurrency)) .setSystemProperty(TASK_PARTITIONED_WRITER_COUNT, Integer.toString(writerConcurrency)) .setSystemProperty(PARTITIONING_PROVIDER_CATALOG, "hive") diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 1977201d17237..274e1b61fec71 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -71,7 +71,6 @@ public final class SystemSessionProperties public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct"; public static final String GROUPED_EXECUTION_FOR_AGGREGATION = "grouped_execution_for_aggregation"; public static final String GROUPED_EXECUTION_FOR_JOIN = "grouped_execution_for_join"; - public static final String GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS = "grouped_execution_for_eligible_table_scans"; public static final String GROUPED_EXECUTION = "grouped_execution"; public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution"; public static final String RECOVERABLE_GROUPED_EXECUTION = "recoverable_grouped_execution"; @@ -257,11 +256,6 @@ public SystemSessionProperties( "Use grouped execution for foin when possible", featuresConfig.isGroupedExecutionForJoinEnabled(), false), - booleanProperty( - GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, - "Experimental: Use grouped execution for eligible table scans", - featuresConfig.isGroupedExecutionForEligibleTableScansEnabled(), - false), booleanProperty( GROUPED_EXECUTION, "Use grouped execution when possible", @@ -906,11 +900,6 @@ public static boolean isGroupedExecutionForJoinEnabled(Session session) return session.getSystemProperty(GROUPED_EXECUTION_FOR_JOIN, Boolean.class) && isGroupedExecutionEnabled(session); } - public static boolean isGroupedExecutionForEligibleTableScansEnabled(Session session) - { - return session.getSystemProperty(GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS, Boolean.class) && isGroupedExecutionEnabled(session); - } - public static boolean isGroupedExecutionEnabled(Session session) { return session.getSystemProperty(GROUPED_EXECUTION, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index dc91d0a7ceed2..0d6f575b492bd 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -70,7 +70,6 @@ public class FeaturesConfig private boolean colocatedJoinsEnabled = true; private boolean groupedExecutionForAggregationEnabled = true; private boolean groupedExecutionForJoinEnabled = true; - private boolean groupedExecutionForEligibleTableScansEnabled = true; private boolean groupedExecutionEnabled = true; private boolean dynamicScheduleForGroupedExecution = true; private boolean recoverableGroupedExecutionEnabled; @@ -399,19 +398,6 @@ public FeaturesConfig setGroupedExecutionForJoinEnabled(boolean groupedExecution return this; } - public boolean isGroupedExecutionForEligibleTableScansEnabled() - { - return groupedExecutionForEligibleTableScansEnabled; - } - - @Config("experimental.grouped-execution-for-eligible-table-scans-enabled") - @ConfigDescription("Experimental: Use grouped execution for eligible table scans") - public FeaturesConfig setGroupedExecutionForEligibleTableScansEnabled(boolean groupedExecutionForEligibleTableScansEnabled) - { - this.groupedExecutionForEligibleTableScansEnabled = groupedExecutionForEligibleTableScansEnabled; - return this; - } - public boolean isGroupedExecutionEnabled() { return groupedExecutionEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index a0a965f40b622..5c536dc582e48 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -96,7 +96,6 @@ import static com.facebook.presto.SystemSessionProperties.isDynamicScheduleForGroupedExecution; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForAggregationEnabled; -import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForEligibleTableScansEnabled; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForJoinEnabled; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; @@ -1158,7 +1157,7 @@ public GroupedExecutionProperties visitTableScan(TableScanNode node, Void contex else { return new GroupedExecutionProperties( true, - isGroupedExecutionForEligibleTableScansEnabled(session), + false, ImmutableList.of(node.getId()), partitionHandles.size(), metadata.getConnectorCapabilities(session, node.getTable().getConnectorId()).contains(SUPPORTS_REWINDABLE_SPLIT_SOURCE)); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 6ab8101aa421f..a46b75d7a2432 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -59,7 +59,6 @@ public void testDefaults() .setJoinMaxBroadcastTableSize(null) .setGroupedExecutionForAggregationEnabled(true) .setGroupedExecutionForJoinEnabled(true) - .setGroupedExecutionForEligibleTableScansEnabled(true) .setDynamicScheduleForGroupedExecutionEnabled(true) .setGroupedExecutionEnabled(true) .setRecoverableGroupedExecutionEnabled(false) @@ -168,7 +167,6 @@ public void testExplicitPropertyMappings() .put("join-max-broadcast-table-size", "42GB") .put("grouped-execution-for-aggregation-enabled", "false") .put("grouped-execution-for-join-enabled", "false") - .put("experimental.grouped-execution-for-eligible-table-scans-enabled", "false") .put("grouped-execution-enabled", "false") .put("dynamic-schedule-for-grouped-execution", "false") .put("recoverable-grouped-execution-enabled", "true") @@ -253,7 +251,6 @@ public void testExplicitPropertyMappings() .setJoinMaxBroadcastTableSize(new DataSize(42, GIGABYTE)) .setGroupedExecutionForAggregationEnabled(false) .setGroupedExecutionForJoinEnabled(false) - .setGroupedExecutionForEligibleTableScansEnabled(false) .setGroupedExecutionEnabled(false) .setDynamicScheduleForGroupedExecutionEnabled(false) .setRecoverableGroupedExecutionEnabled(true) diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java index 68555ba1c08dd..9f873b3fa9d17 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java @@ -28,7 +28,6 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForAggregationEnabled; -import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForEligibleTableScansEnabled; import static com.facebook.presto.SystemSessionProperties.isGroupedExecutionForJoinEnabled; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; @@ -49,8 +48,7 @@ public void verify(SparkContext sparkContext, Session session) verify(!isDistributedSortEnabled(session), "distributed sort is not supported"); verify(getExchangeMaterializationStrategy(session) == NONE, "exchange materialization is not supported"); verify(getPartitioningProviderCatalog(session).equals(GlobalSystemConnector.NAME), "partitioning provider other that system is not supported"); - verify(!isGroupedExecutionForEligibleTableScansEnabled(session) && - !isGroupedExecutionForAggregationEnabled(session) && + verify(!isGroupedExecutionForAggregationEnabled(session) && !isRecoverableGroupedExecutionEnabled(session) && !isDynamicScheduleForGroupedExecution(session) && !isGroupedExecutionForJoinEnabled(session) &&