Skip to content

Commit 0fa7351

Browse files
committed
Avoid fetching of statistics from DeterminePartitionCount in small cluster
When the max possible tasks count is close to the min partition count, it is not worth paying the cost of fetching statistics to determine optimal partitions count
1 parent 5f87c92 commit 0fa7351

File tree

6 files changed

+58
-13
lines changed

6 files changed

+58
-13
lines changed

core/trino-main/src/main/java/io/trino/sql/planner/optimizations/DeterminePartitionCount.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,13 @@ private Optional<Integer> determinePartitionCount(
161161
}
162162
verify(minPartitionCount <= maxPartitionCount, "minPartitionCount %s larger than maxPartitionCount %s",
163163
minPartitionCount, maxPartitionCount);
164+
int maxPossiblePartitionCount = taskCountEstimator.estimateHashedTaskCount(session);
165+
RetryPolicy retryPolicy = getRetryPolicy(session);
166+
if (maxPossiblePartitionCount <= 2 * minPartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) {
167+
// Do not set partition count if the possible partition count is already close to the minimum partition count.
168+
// This avoids incurring cost of fetching table statistics for simple queries when the cluster is small.
169+
return Optional.empty();
170+
}
164171

165172
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, tableStatsProvider);
166173
long queryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
@@ -188,7 +195,7 @@ private Optional<Integer> determinePartitionCount(
188195
return Optional.empty();
189196
}
190197

191-
if (partitionCount * 2 >= taskCountEstimator.estimateHashedTaskCount(session) && !getRetryPolicy(session).equals(RetryPolicy.TASK)) {
198+
if (partitionCount * 2 >= maxPossiblePartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) {
192199
// Do not cap partition count if it's already close to the possible number of tasks.
193200
return Optional.empty();
194201
}

core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestDeterminePartitionCount.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,51 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
241241
node(TableScanNode.class))))))));
242242
}
243243

244+
@Test
245+
public void testDoesNotSetPartitionCountWhenNodeCountIsCloseToMinPartitionCount()
246+
{
247+
@Language("SQL") String query =
248+
"""
249+
SELECT count(column_a) FROM table_with_stats_a group by column_b
250+
""";
251+
252+
// DeterminePartitionCount shouldn't put partition count when 2 * MIN_HASH_PARTITION_COUNT
253+
// is greater or equal to number of workers.
254+
assertDistributedPlan(
255+
query,
256+
Session.builder(getPlanTester().getDefaultSession())
257+
.setSystemProperty(MAX_HASH_PARTITION_COUNT, "8")
258+
.setSystemProperty(MIN_HASH_PARTITION_COUNT, "4")
259+
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
260+
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
261+
.build(),
262+
output(
263+
project(
264+
node(AggregationNode.class,
265+
exchange(LOCAL,
266+
exchange(REMOTE, REPARTITION, Optional.empty(),
267+
node(AggregationNode.class,
268+
node(TableScanNode.class))))))));
269+
270+
// DeterminePartitionCount should still put partition count for FTE
271+
assertDistributedPlan(
272+
query,
273+
Session.builder(getPlanTester().getDefaultSession())
274+
.setSystemProperty(RETRY_POLICY, "task")
275+
.setSystemProperty(MAX_HASH_PARTITION_COUNT, "8")
276+
.setSystemProperty(MIN_HASH_PARTITION_COUNT, "4")
277+
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
278+
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
279+
.build(),
280+
output(
281+
project(
282+
node(AggregationNode.class,
283+
exchange(LOCAL,
284+
exchange(REMOTE, REPARTITION, Optional.of(10),
285+
node(AggregationNode.class,
286+
node(TableScanNode.class))))))));
287+
}
288+
244289
@Test
245290
public void testPlanWhenTableStatisticsAreAbsent()
246291
{
@@ -539,15 +584,15 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
539584
Session.builder(getPlanTester().getDefaultSession())
540585
.setSystemProperty(RETRY_POLICY, "task")
541586
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "21")
542-
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "11")
587+
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "10")
543588
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
544589
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
545590
.build(),
546591
output(
547592
project(
548593
node(AggregationNode.class,
549594
exchange(LOCAL,
550-
exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(11),
595+
exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(10),
551596
node(AggregationNode.class,
552597
node(TableScanNode.class))))))));
553598
}
@@ -563,7 +608,7 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
563608
query,
564609
Session.builder(getPlanTester().getDefaultSession())
565610
.setSystemProperty(RETRY_POLICY, "task")
566-
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "8")
611+
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "9")
567612
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "4")
568613
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
569614
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ public void testReadUnpartitionedTable()
225225
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
226226
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
227227
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
228-
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
229228
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2)
230229
.build());
231230

@@ -319,7 +318,6 @@ public void testReadWholePartition()
319318
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
320319
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
321320
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
322-
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
323321
.addCopies(new FileOperation(DATA, "key=p1/", "InputFile.newInput"), 2)
324322
.addCopies(new FileOperation(DATA, "key=p2/", "InputFile.newInput"), 2)
325323
.build());
@@ -337,7 +335,6 @@ public void testReadWholePartition()
337335
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
338336
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
339337
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
340-
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
341338
.build());
342339

343340
// Read partition column only, one partition only
@@ -368,7 +365,6 @@ public void testReadWholePartition()
368365
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
369366
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
370367
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
371-
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
372368
.build());
373369

374370
// Read only row count
@@ -419,7 +415,6 @@ public void testReadWholePartitionSplittableFile()
419415
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"))
420416
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
421417
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
422-
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
423418
.build());
424419

425420
// Read only row count

plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6829,7 +6829,7 @@ public void testInvalidAnalyzePartitionedTable()
68296829
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[NULL])", tableName), ".*Invalid null value in analyze partitions property.*");
68306830

68316831
// Test non-existed partition
6832-
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4', '10']])", tableName), ".*Partition.*not found.*");
6832+
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4', '10']])", tableName), "Partition .* no longer exists.*|Partition.*not found.*");
68336833

68346834
// Test partition schema mismatch
68356835
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4']])", tableName), "Partition value count does not match partition column count");

plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2005,7 +2005,7 @@ public void testAnalyzePartitionedTableWithCanonicalization()
20052005
(null, null, null, null, 5.0, null, null)
20062006
""");
20072007
// TODO (https://github.com/trinodb/trino/issues/15998) fix selective ANALYZE for table with non-canonical partition values
2008-
assertQueryFails("ANALYZE " + getFullyQualifiedTestTableName(externalTableName) + " WITH (partitions = ARRAY[ARRAY['4']])", ".*Partition.*not found.*");
2008+
assertQueryFails("ANALYZE " + getFullyQualifiedTestTableName(externalTableName) + " WITH (partitions = ARRAY[ARRAY['4']])", "Partition month=4 no longer exists.*");
20092009

20102010
assertUpdate("DROP TABLE " + getFullyQualifiedTestTableName(externalTableName));
20112011
assertUpdate("DROP TABLE " + getFullyQualifiedTestTableName(tableName));

plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,6 @@ public void testAnalyzePartitionedTable()
293293
.addCopies(GET_TABLE, 1)
294294
.add(GET_PARTITION_NAMES_BY_FILTER)
295295
.add(GET_PARTITIONS_BY_NAMES)
296-
.add(GET_PARTITION_COLUMN_STATISTICS)
297296
.add(UPDATE_PARTITION_STATISTICS)
298297
.build());
299298

@@ -304,7 +303,6 @@ public void testAnalyzePartitionedTable()
304303
.add(GET_TABLE)
305304
.add(GET_PARTITION_NAMES_BY_FILTER)
306305
.add(GET_PARTITIONS_BY_NAMES)
307-
.add(GET_PARTITION_COLUMN_STATISTICS)
308306
.add(UPDATE_PARTITION_STATISTICS)
309307
.build());
310308
}

0 commit comments

Comments
 (0)