Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ private Optional<Integer> determinePartitionCount(
}
verify(minPartitionCount <= maxPartitionCount, "minPartitionCount %s larger than maxPartitionCount %s",
minPartitionCount, maxPartitionCount);
int maxPossiblePartitionCount = taskCountEstimator.estimateHashedTaskCount(session);
RetryPolicy retryPolicy = getRetryPolicy(session);
if (maxPossiblePartitionCount <= 2 * minPartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) {
// Do not set partition count if the possible partition count is already close to the minimum partition count.
// This avoids incurring cost of fetching table statistics for simple queries when the cluster is small.
return Optional.empty();
}

StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, tableStatsProvider);
long queryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
Expand Down Expand Up @@ -188,7 +195,7 @@ private Optional<Integer> determinePartitionCount(
return Optional.empty();
}

if (partitionCount * 2 >= taskCountEstimator.estimateHashedTaskCount(session) && !getRetryPolicy(session).equals(RetryPolicy.TASK)) {
if (partitionCount * 2 >= maxPossiblePartitionCount && !retryPolicy.equals(RetryPolicy.TASK)) {
// Do not cap partition count if it's already close to the possible number of tasks.
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,51 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
node(TableScanNode.class))))))));
}

@Test
public void testDoesNotSetPartitionCountWhenNodeCountIsCloseToMinPartitionCount()
{
@Language("SQL") String query =
"""
SELECT count(column_a) FROM table_with_stats_a group by column_b
""";

// DeterminePartitionCount shouldn't put partition count when 2 * MIN_HASH_PARTITION_COUNT
// is greater or equal to number of workers.
assertDistributedPlan(
query,
Session.builder(getPlanTester().getDefaultSession())
.setSystemProperty(MAX_HASH_PARTITION_COUNT, "8")
.setSystemProperty(MIN_HASH_PARTITION_COUNT, "4")
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
.build(),
output(
project(
node(AggregationNode.class,
exchange(LOCAL,
exchange(REMOTE, REPARTITION, Optional.empty(),
node(AggregationNode.class,
node(TableScanNode.class))))))));

// DeterminePartitionCount should still put partition count for FTE
assertDistributedPlan(
query,
Session.builder(getPlanTester().getDefaultSession())
.setSystemProperty(RETRY_POLICY, "task")
.setSystemProperty(MAX_HASH_PARTITION_COUNT, "8")
.setSystemProperty(MIN_HASH_PARTITION_COUNT, "4")
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
.build(),
output(
project(
node(AggregationNode.class,
exchange(LOCAL,
exchange(REMOTE, REPARTITION, Optional.of(10),
node(AggregationNode.class,
node(TableScanNode.class))))))));
}

@Test
public void testPlanWhenTableStatisticsAreAbsent()
{
Expand Down Expand Up @@ -539,15 +584,15 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
Session.builder(getPlanTester().getDefaultSession())
.setSystemProperty(RETRY_POLICY, "task")
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "21")
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "11")
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "10")
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
.build(),
output(
project(
node(AggregationNode.class,
exchange(LOCAL,
exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(11),
exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, Optional.of(10),
node(AggregationNode.class,
node(TableScanNode.class))))))));
}
Expand All @@ -563,7 +608,7 @@ SELECT count(column_a) FROM table_with_stats_a group by column_b
query,
Session.builder(getPlanTester().getDefaultSession())
.setSystemProperty(RETRY_POLICY, "task")
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "8")
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT, "9")
.setSystemProperty(FAULT_TOLERANT_EXECUTION_MIN_PARTITION_COUNT, "4")
.setSystemProperty(MIN_INPUT_SIZE_PER_TASK, "20MB")
.setSystemProperty(MIN_INPUT_ROWS_PER_TASK, "400")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public void testReadUnpartitionedTable()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
.addCopies(new FileOperation(DATA, "no partition", "InputFile.newInput"), 2)
.build());

Expand Down Expand Up @@ -319,7 +318,6 @@ public void testReadWholePartition()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
.addCopies(new FileOperation(DATA, "key=p1/", "InputFile.newInput"), 2)
.addCopies(new FileOperation(DATA, "key=p2/", "InputFile.newInput"), 2)
.build());
Expand All @@ -337,7 +335,6 @@ public void testReadWholePartition()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
.build());

// Read partition column only, one partition only
Expand Down Expand Up @@ -368,7 +365,6 @@ public void testReadWholePartition()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", "InputFile.length"))
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
.build());

// Read only row count
Expand Down Expand Up @@ -419,7 +415,6 @@ public void testReadWholePartitionSplittableFile()
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", "InputFile.length"))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", "InputFile.length"))
.add(new FileOperation(TRINO_EXTENDED_STATS_JSON, "extended_stats.json", "InputFile.newStream"))
.build());

// Read only row count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6829,7 +6829,7 @@ public void testInvalidAnalyzePartitionedTable()
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[NULL])", tableName), ".*Invalid null value in analyze partitions property.*");

// Test non-existed partition
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4', '10']])", tableName), ".*Partition.*not found.*");
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4', '10']])", tableName), "Partition .* no longer exists.*|Partition.*not found.*");

// Test partition schema mismatch
assertQueryFails(format("ANALYZE %s WITH (partitions = ARRAY[ARRAY['p4']])", tableName), "Partition value count does not match partition column count");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,7 @@ public void testAnalyzePartitionedTableWithCanonicalization()
(null, null, null, null, 5.0, null, null)
""");
// TODO (https://github.com/trinodb/trino/issues/15998) fix selective ANALYZE for table with non-canonical partition values
assertQueryFails("ANALYZE " + getFullyQualifiedTestTableName(externalTableName) + " WITH (partitions = ARRAY[ARRAY['4']])", ".*Partition.*not found.*");
assertQueryFails("ANALYZE " + getFullyQualifiedTestTableName(externalTableName) + " WITH (partitions = ARRAY[ARRAY['4']])", "Partition month=4 no longer exists.*");

assertUpdate("DROP TABLE " + getFullyQualifiedTestTableName(externalTableName));
assertUpdate("DROP TABLE " + getFullyQualifiedTestTableName(tableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ public void testAnalyzePartitionedTable()
.addCopies(GET_TABLE, 1)
.add(GET_PARTITION_NAMES_BY_FILTER)
.add(GET_PARTITIONS_BY_NAMES)
.add(GET_PARTITION_COLUMN_STATISTICS)
.add(UPDATE_PARTITION_STATISTICS)
.build());

Expand All @@ -304,7 +303,6 @@ public void testAnalyzePartitionedTable()
.add(GET_TABLE)
.add(GET_PARTITION_NAMES_BY_FILTER)
.add(GET_PARTITIONS_BY_NAMES)
.add(GET_PARTITION_COLUMN_STATISTICS)
.add(UPDATE_PARTITION_STATISTICS)
.build());
}
Expand Down