diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java index 2a6428450a2b..87a3acb1134b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java @@ -161,6 +161,13 @@ private Optional determinePartitionCount( } verify(minPartitionCount <= maxPartitionCount, "minPartitionCount %s larger than maxPartitionCount %s", minPartitionCount, maxPartitionCount); + int maxPossiblePartitionCount = taskCountEstimator.estimateHashedTaskCount(session); + RetryPolicy retryPolicy = getRetryPolicy(session); + if (maxPossiblePartitionCount <= 2 * minPartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) { + // Do not set partition count if the possible partition count is already close to the minimum partition count. + // This avoids incurring cost of fetching table statistics for simple queries when the cluster is small. + return Optional.empty(); + } StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, tableStatsProvider); long queryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes(); @@ -188,7 +195,7 @@ private Optional determinePartitionCount( return Optional.empty(); } - if (partitionCount * 2 >= taskCountEstimator.estimateHashedTaskCount(session) && !getRetryPolicy(session).equals(RetryPolicy.TASK)) { + if (partitionCount * 2 >= maxPossiblePartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) { // Do not cap partition count if it's already close to the possible number of tasks. return Optional.empty(); } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestDeterminePartitionCount.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestDeterminePartitionCount.java index 17d782a920e6..f12559d10031 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestDeterminePartitionCount.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestDeterminePartitionCount.java @@ -241,6 +241,51 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b node(TableScanNode.class)))))))); } + @Test + public void testDoesNotSetPartitionCountWhenNodeCountIsCloseToMinPartitionCount() + { + @Language("SQL") String query = + """ + SELECT count(column_a) FROM table_with_stats_a group by column_b + """; + + // DeterminePartitionCount shouldn't put partition count when 2 * MIN_HASH_PARTITION_COUNT + // is greater or equal to number of workers. + assertDistributedPlan( + query, + Session.builder(getPlanTester().getDefaultSession()) + .setSystemProperty(MAX_HASH_PARTITION_COUNT, "8") + .setSystemProperty(MIN_HASH_PARTITION_COUNT, "4") + .setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB") + .setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400") + .build(), + output( + project( + node(AggregationNode.class, + exchange(LOCAL, + exchange(REMOTE, REPARTITION, Optional.empty(), + node(AggregationNode.class, + node(TableScanNode.class)))))))); + + // DeterminePartitionCount should still put partition count for FTE + assertDistributedPlan( + query, + Session.builder(getPlanTester().getDefaultSession()) + .setSystemProperty(RETRY_POLICY, "task") + .setSystemProperty(MAX_HASH_PARTITION_COUNT, "8") + .setSystemProperty(MIN_HASH_PARTITION_COUNT, "4") + .setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB") + .setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400") + .build(), + output( + project( + node(AggregationNode.class, + exchange(LOCAL, + exchange(REMOTE, REPARTITION, Optional.of(10), + node(AggregationNode.class, + node(TableScanNode.class)))))))); + } + @Test public void testPlanWhenTableStatisticsAreAbsent() { @@ -539,7 +584,7 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b Session.builder(getPlanTester().getDefaultSession()) .setSystemProperty(RETRY_POLICY, "task") .setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "21") - .setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "11") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "10") .setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB") .setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400") .build(), @@ -547,7 +592,7 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b project( node(AggregationNode.class, exchange(LOCAL, - exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(11), + exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(10), node(AggregationNode.class, node(TableScanNode.class)))))))); } @@ -563,7 +608,7 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b query, Session.builder(getPlanTester().getDefaultSession()) .setSystemProperty(RETRY_POLICY, "task") - .setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "8") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "9") .setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "4") .setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB") .setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400") diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index f6a2463bfcf2..0d8a0451a263 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -225,7 +225,6 @@ public void testReadUnpartitionedTable() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) - .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2) .build()); @@ -319,7 +318,6 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) - .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .addCopies(new FileOperation(DATA, "key=p1/", "InputFile.newInput"), 2) .addCopies(new FileOperation(DATA, "key=p2/", "InputFile.newInput"), 2) .build()); @@ -337,7 +335,6 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) - .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .build()); // Read partition column only, one partition only @@ -368,7 +365,6 @@ public void testReadWholePartition() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length")) - .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .build()); // Read only row count @@ -419,7 +415,6 @@ public void testReadWholePartitionSplittableFile() .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length")) .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length")) - .add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream")) .build()); // Read only row count diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index c76634275bb0..135798f5287b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -6829,7 +6829,7 @@ public void testInvalidAnalyzePartitionedTable() assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[NULL])", tableName), ".*Invalid null value in analyze partitions property.*"); // Test non-existed partition - assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4', '10']])", tableName), ".*Partition.*not found.*"); + assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4', '10']])", tableName), "Partition .* no longer exists.*|Partition.*not found.*"); // Test partition schema mismatch assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4']])", tableName), "Partition value count does not match partition column count"); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java index 52ab78234ca4..c86c8fcfe34e 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java @@ -2005,7 +2005,7 @@ public void testAnalyzePartitionedTableWithCanonicalization() (null, null, null, null, 5.0, null, null) """); // TODO (https://github.com/trinodb/trino/issues/15998) fix selective ANALYZE for table with non-canonical partition values - assertQueryFails("ANALYZE " + getFullyQualifiedTestTableName(externalTableName) + " WITH (partitions = ARRAY[ARRAY['4']])", ".*Partition.*not found.*"); + assertQueryFails("ANALYZE " + getFullyQualifiedTestTableName(externalTableName) + " WITH (partitions = ARRAY[ARRAY['4']])", "Partition month=4 no longer exists.*"); assertUpdate("DROP TABLE " + getFullyQualifiedTestTableName(externalTableName)); assertUpdate("DROP TABLE " + getFullyQualifiedTestTableName(tableName)); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java index 06d0c8cc2343..e94154fe0f1c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java @@ -293,7 +293,6 @@ public void testAnalyzePartitionedTable() .addCopies(GET_TABLE, 1) .add(GET_PARTITION_NAMES_BY_FILTER) .add(GET_PARTITIONS_BY_NAMES) - .add(GET_PARTITION_COLUMN_STATISTICS) .add(UPDATE_PARTITION_STATISTICS) .build()); @@ -304,7 +303,6 @@ public void testAnalyzePartitionedTable() .add(GET_TABLE) .add(GET_PARTITION_NAMES_BY_FILTER) .add(GET_PARTITIONS_BY_NAMES) - .add(GET_PARTITION_COLUMN_STATISTICS) .add(UPDATE_PARTITION_STATISTICS) .build()); }