diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 6716dc4fd532..9f8caf51f119 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -80,7 +80,6 @@ public final class SystemSessionProperties public static final String QUERY_MAX_STAGE_COUNT = "query_max_stage_count"; public static final String REDISTRIBUTE_WRITES = "redistribute_writes"; public static final String USE_PREFERRED_WRITE_PARTITIONING = "use_preferred_write_partitioning"; - public static final String PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS = "preferred_write_partitioning_min_number_of_partitions"; public static final String SCALE_WRITERS = "scale_writers"; public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled"; public static final String MAX_WRITER_TASKS_COUNT = "max_writer_tasks_count"; @@ -306,16 +305,6 @@ public SystemSessionProperties( "Use preferred write partitioning", optimizerConfig.isUsePreferredWritePartitioning(), false), - integerProperty( - PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, - "Use preferred write partitioning when the number of written partitions exceeds the configured threshold", - optimizerConfig.getPreferredWritePartitioningMinNumberOfPartitions(), - value -> { - if (value < 1) { - throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be greater than or equal to 1: %s", PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, value)); - } - }, - false), booleanProperty( SCALE_WRITERS, "Scale out writers based on throughput (use minimum necessary)", @@ -1093,11 +1082,6 @@ public static boolean isUsePreferredWritePartitioning(Session session) return session.getSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, Boolean.class); } - public static int getPreferredWritePartitioningMinNumberOfPartitions(Session session) - { - return session.getSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, Integer.class); - } - public static boolean isScaleWriters(Session session) { return session.getSystemProperty(SCALE_WRITERS, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java index e188c911beff..9449058ff58e 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java @@ -69,6 +69,7 @@ public class PagePartitioner private final PageSerializer serializer; private final PositionsAppenderPageBuilder[] positionsAppenders; private final boolean replicatesAnyRow; + private final boolean partitionProcessRleAndDictionaryBlocks; private final int nullChannel; // when >= 0, send the position to every partition if this channel is null private PartitionedOutputInfoSupplier partitionedOutputInfoSupplier; @@ -86,7 +87,8 @@ public PagePartitioner( DataSize maxMemory, PositionsAppenderFactory positionsAppenderFactory, Optional exchangeEncryptionKey, - AggregatedMemoryContext aggregatedMemoryContext) + AggregatedMemoryContext aggregatedMemoryContext, + boolean partitionProcessRleAndDictionaryBlocks) { this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null"); this.partitionChannels = Ints.toArray(requireNonNull(partitionChannels, "partitionChannels is null")); @@ -104,6 +106,7 @@ public PagePartitioner( this.nullChannel = nullChannel.orElse(-1); this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); this.serializer = serdeFactory.createSerializer(exchangeEncryptionKey.map(Ciphers::deserializeAesEncryptionKey)); + this.partitionProcessRleAndDictionaryBlocks = partitionProcessRleAndDictionaryBlocks; // Ensure partition channels align with constant arguments provided for (int i = 0; i < this.partitionChannels.length; i++) { @@ -125,6 +128,11 @@ public PagePartitioner( updateMemoryUsage(); } + public PartitionFunction getPartitionFunction() + { + return partitionFunction; + } + // sets up this partitioner for the new operator public void setupOperator(OperatorContext operatorContext) { @@ -232,12 +240,12 @@ private IntArrayList[] partitionPositions(Page page) Page partitionFunctionArgs = getPartitionFunctionArguments(page); - if (partitionFunctionArgs.getChannelCount() > 0 && onlyRleBlocks(partitionFunctionArgs)) { + if (partitionProcessRleAndDictionaryBlocks && partitionFunctionArgs.getChannelCount() > 0 && onlyRleBlocks(partitionFunctionArgs)) { // we need at least one Rle block since with no blocks partition function // can return a different value per invocation (e.g. RoundRobinBucketFunction) partitionBySingleRleValue(page, position, partitionFunctionArgs, partitionPositions); } - else if (partitionFunctionArgs.getChannelCount() == 1 && isDictionaryProcessingFaster(partitionFunctionArgs.getBlock(0))) { + else if (partitionProcessRleAndDictionaryBlocks && partitionFunctionArgs.getChannelCount() == 1 && isDictionaryProcessingFaster(partitionFunctionArgs.getBlock(0))) { partitionBySingleDictionary(page, position, partitionFunctionArgs, partitionPositions); } else { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java b/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java index 5b5aa956bcfc..be23b52ec017 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java @@ -60,6 +60,7 @@ public static class PartitionedOutputFactory private final Optional exchangeEncryptionKey; private final AggregatedMemoryContext memoryContext; private final int pagePartitionerPoolSize; + private final Optional skewedPartitionRebalancer; public PartitionedOutputFactory( PartitionFunction partitionFunction, @@ -72,7 +73,8 @@ public PartitionedOutputFactory( PositionsAppenderFactory positionsAppenderFactory, Optional exchangeEncryptionKey, AggregatedMemoryContext memoryContext, - int pagePartitionerPoolSize) + int pagePartitionerPoolSize, + Optional skewedPartitionRebalancer) { this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null"); this.partitionChannels = requireNonNull(partitionChannels, "partitionChannels is null"); @@ -85,6 +87,7 @@ public PartitionedOutputFactory( this.exchangeEncryptionKey = requireNonNull(exchangeEncryptionKey, "exchangeEncryptionKey is null"); this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); this.pagePartitionerPoolSize = pagePartitionerPoolSize; + this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null"); } @Override @@ -111,7 +114,8 @@ public OperatorFactory createOutputOperator( positionsAppenderFactory, exchangeEncryptionKey, memoryContext, - pagePartitionerPoolSize); + pagePartitionerPoolSize, + skewedPartitionRebalancer); } } @@ -134,6 +138,7 @@ public static class PartitionedOutputOperatorFactory private final Optional exchangeEncryptionKey; private final AggregatedMemoryContext memoryContext; private final int pagePartitionerPoolSize; + private final Optional skewedPartitionRebalancer; private final PagePartitionerPool pagePartitionerPool; public PartitionedOutputOperatorFactory( @@ -152,7 +157,8 @@ public PartitionedOutputOperatorFactory( PositionsAppenderFactory positionsAppenderFactory, Optional exchangeEncryptionKey, AggregatedMemoryContext memoryContext, - int pagePartitionerPoolSize) + int pagePartitionerPoolSize, + Optional skewedPartitionRebalancer) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); @@ -170,21 +176,33 @@ public PartitionedOutputOperatorFactory( this.exchangeEncryptionKey = requireNonNull(exchangeEncryptionKey, "exchangeEncryptionKey is null"); this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); this.pagePartitionerPoolSize = pagePartitionerPoolSize; + this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null"); this.pagePartitionerPool = new PagePartitionerPool( pagePartitionerPoolSize, - () -> new PagePartitioner( - partitionFunction, - partitionChannels, - partitionConstants, - replicatesAnyRow, - nullChannel, - outputBuffer, - serdeFactory, - sourceTypes, - maxMemory, - positionsAppenderFactory, - exchangeEncryptionKey, - memoryContext)); + () -> { + boolean partitionProcessRleAndDictionaryBlocks = true; + PartitionFunction function = partitionFunction; + if (skewedPartitionRebalancer.isPresent()) { + function = new SkewedPartitionFunction(partitionFunction, skewedPartitionRebalancer.get()); + // Partition flattened Rle and Dictionary blocks since if they are scaled then we want to + // round-robin the entire block to increase the writing parallelism across tasks/workers. + partitionProcessRleAndDictionaryBlocks = false; + } + return new PagePartitioner( + function, + partitionChannels, + partitionConstants, + replicatesAnyRow, + nullChannel, + outputBuffer, + serdeFactory, + sourceTypes, + maxMemory, + positionsAppenderFactory, + exchangeEncryptionKey, + memoryContext, + partitionProcessRleAndDictionaryBlocks); + }); } @Override @@ -195,7 +213,8 @@ public Operator createOperator(DriverContext driverContext) operatorContext, pagePreprocessor, outputBuffer, - pagePartitionerPool); + pagePartitionerPool, + skewedPartitionRebalancer); } @Override @@ -223,14 +242,16 @@ public OperatorFactory duplicate() positionsAppenderFactory, exchangeEncryptionKey, memoryContext, - pagePartitionerPoolSize); + pagePartitionerPoolSize, + skewedPartitionRebalancer); } } private final OperatorContext operatorContext; private final Function pagePreprocessor; private final PagePartitionerPool pagePartitionerPool; - private final PagePartitioner partitionFunction; + private final PagePartitioner pagePartitioner; + private final Optional skewedPartitionRebalancer; // outputBuffer is used only to block the operator from finishing if the outputBuffer is full private final OutputBuffer outputBuffer; private ListenableFuture isBlocked = NOT_BLOCKED; @@ -240,14 +261,16 @@ public PartitionedOutputOperator( OperatorContext operatorContext, Function pagePreprocessor, OutputBuffer outputBuffer, - PagePartitionerPool pagePartitionerPool) + PagePartitionerPool pagePartitionerPool, + Optional skewedPartitionRebalancer) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); this.pagePreprocessor = requireNonNull(pagePreprocessor, "pagePreprocessor is null"); this.pagePartitionerPool = requireNonNull(pagePartitionerPool, "pagePartitionerPool is null"); this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); - this.partitionFunction = requireNonNull(pagePartitionerPool.poll(), "partitionFunction is null"); - this.partitionFunction.setupOperator(operatorContext); + this.pagePartitioner = requireNonNull(pagePartitionerPool.poll(), "pagePartitioner is null"); + this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null"); + this.pagePartitioner.setupOperator(operatorContext); } @Override @@ -260,7 +283,7 @@ public OperatorContext getOperatorContext() public void finish() { if (!finished) { - pagePartitionerPool.release(partitionFunction); + pagePartitionerPool.release(pagePartitioner); finished = true; } } @@ -309,7 +332,22 @@ public void addInput(Page page) } page = pagePreprocessor.apply(page); - partitionFunction.partitionPage(page); + pagePartitioner.partitionPage(page); + + // Rebalance skewed partitions in the case of scale writer hash partitioning + if (skewedPartitionRebalancer.isPresent()) { + SkewedPartitionRebalancer rebalancer = skewedPartitionRebalancer.get(); + + // Update data processed and partitionRowCount state + rebalancer.addDataProcessed(page.getSizeInBytes()); + ((SkewedPartitionFunction) pagePartitioner.getPartitionFunction()).flushPartitionRowCountToRebalancer(); + + // Rebalance only when output buffer is full. This resembles that the downstream writing stage is slow, and + // we could rebalance partitions to increase the concurrency at downstream stage. + if (!outputBuffer.isFull().isDone()) { + rebalancer.rebalance(); + } + } } @Override diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionFunction.java b/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionFunction.java new file mode 100644 index 000000000000..058e5e49a19a --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionFunction.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.operator.output; + +import io.trino.operator.PartitionFunction; +import io.trino.spi.Page; + +import static java.util.Objects.requireNonNull; + +public class SkewedPartitionFunction + implements PartitionFunction +{ + private final PartitionFunction partitionFunction; + private final SkewedPartitionRebalancer skewedPartitionRebalancer; + + private final long[] partitionRowCount; + + public SkewedPartitionFunction(PartitionFunction partitionFunction, SkewedPartitionRebalancer skewedPartitionRebalancer) + { + this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null"); + this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null"); + + this.partitionRowCount = new long[partitionFunction.getPartitionCount()]; + } + + @Override + public int getPartitionCount() + { + return skewedPartitionRebalancer.getTaskCount(); + } + + @Override + public int getPartition(Page page, int position) + { + int partition = partitionFunction.getPartition(page, position); + return skewedPartitionRebalancer.getTaskId(partition, partitionRowCount[partition]++); + } + + public void flushPartitionRowCountToRebalancer() + { + for (int partition = 0; partition < partitionFunction.getPartitionCount(); partition++) { + skewedPartitionRebalancer.addPartitionRowCount(partition, partitionRowCount[partition]); + partitionRowCount[partition] = 0; + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java b/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java new file mode 100644 index 000000000000..da8964b81a2d --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/SkewedPartitionRebalancer.java @@ -0,0 +1,450 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import io.airlift.log.Logger; +import io.airlift.units.DataSize; +import io.trino.Session; +import io.trino.execution.resourcegroups.IndexedPriorityQueue; +import io.trino.operator.PartitionFunction; +import io.trino.spi.connector.ConnectorBucketNodeMap; +import io.trino.spi.type.Type; +import io.trino.sql.planner.NodePartitioningManager; +import io.trino.sql.planner.PartitioningHandle; +import io.trino.sql.planner.PartitioningScheme; +import io.trino.sql.planner.SystemPartitioningHandle; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.stream.IntStream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.sql.planner.PartitioningHandle.isScaledWriterHashDistribution; +import static java.lang.Double.isNaN; +import static java.lang.Math.ceil; +import static java.lang.Math.floorMod; +import static java.lang.Math.max; + +/** + * Helps in distributing big or skewed partitions across available tasks to improve the performance of + * partitioned writes. + *

+ * This rebalancer initialize a bunch of buckets for each task based on a given taskBucketCount and then tries to + * uniformly distribute partitions across those buckets. This helps to mitigate two problems: + * 1. Mitigate skewness across tasks. + * 2. Scale few big partitions across tasks even if there's no skewness among them. This will essentially speed the + * local scaling without impacting much overall resource utilization. + *

+ * Example: + *

+ * Before: 3 tasks, 3 buckets per task, and 2 skewed partitions + * Task1 Task2 Task3 + * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 + * Bucket2 Bucket2 Bucket2 + * Bucket3 Bucket3 Bucket3 + *

+ * After rebalancing: + * Task1 Task2 Task3 + * Bucket1 (Part 1) Bucket1 (Part 2) Bucket1 (Part 1) + * Bucket2 (Part 2) Bucket2 (Part 1) Bucket2 (Part 2) + * Bucket3 Bucket3 Bucket3 + */ +@ThreadSafe +public class SkewedPartitionRebalancer +{ + private static final Logger log = Logger.get(SkewedPartitionRebalancer.class); + // Keep the scale writers partition count big enough such that we could rebalance skewed partitions + // at more granularity, thus leading to less resource utilization at writer stage. + private static final int SCALE_WRITERS_PARTITION_COUNT = 4096; + // If the percentage difference between the two different task buckets with maximum and minimum processed bytes + // since last rebalance is above 0.7 (or 70%), then we consider them skewed. + private static final double TASK_BUCKET_SKEWNESS_THRESHOLD = 0.7; + private static final long MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = DataSize.of(50, MEGABYTE).toBytes(); + + private final int partitionCount; + private final int taskCount; + private final int taskBucketCount; + private final long minPartitionDataProcessedRebalanceThreshold; + private final long minDataProcessedRebalanceThreshold; + + private final AtomicLongArray partitionRowCount; + private final AtomicLong dataProcessed; + private final AtomicLong dataProcessedAtLastRebalance; + + @GuardedBy("this") + private final long[] partitionDataSizeAtLastRebalance; + + @GuardedBy("this") + private final long[] partitionDataSizeSinceLastRebalancePerTask; + + @GuardedBy("this") + private final long[] estimatedTaskBucketDataSizeSinceLastRebalance; + + private final List> partitionAssignments; + + public static boolean checkCanScalePartitionsRemotely(Session session, int taskCount, PartitioningHandle partitioningHandle, NodePartitioningManager nodePartitioningManager) + { + // In case of connector partitioning, check if bucketToPartitions has fixed mapping or not. If it is fixed + // then we can't distribute a bucket across multiple tasks. + boolean hasFixedNodeMapping = partitioningHandle.getCatalogHandle() + .map(handle -> nodePartitioningManager.getConnectorBucketNodeMap(session, partitioningHandle) + .map(ConnectorBucketNodeMap::hasFixedMapping) + .orElse(false)) + .orElse(false); + // Use skewed partition rebalancer only when there are more than one tasks + return taskCount > 1 && !hasFixedNodeMapping && isScaledWriterHashDistribution(partitioningHandle); + } + + public static PartitionFunction createPartitionFunction( + Session session, + NodePartitioningManager nodePartitioningManager, + PartitioningScheme scheme, + List partitionChannelTypes) + { + PartitioningHandle handle = scheme.getPartitioning().getHandle(); + // In case of SystemPartitioningHandle we can use arbitrary bucket count so that skewness mitigation + // is more granular. + // Whereas, in the case of connector partitioning we have to use connector provided bucketCount + // otherwise buckets will get mapped to tasks incorrectly which could affect skewness handling. + // + // For example: if there are 2 hive buckets, 2 tasks, and 10 artificial bucketCount then this + // could be how actual hive buckets are mapped to artificial buckets and tasks. + // + // hive bucket artificial bucket tasks + // 0 0, 2, 4, 6, 8 0, 0, 0, 0, 0 + // 1 1, 3, 5, 7, 9 1, 1, 1, 1, 1 + // + // Here rebalancing will happen slowly even if there's a skewness at task 0 or hive bucket 0 because + // five artificial buckets resemble the first hive bucket. Therefore, these artificial buckets + // have to write minPartitionDataProcessedRebalanceThreshold before they get scaled to task 1, which is slow + // compared to only a single hive bucket reaching the min limit. + int bucketCount = (handle.getConnectorHandle() instanceof SystemPartitioningHandle) + ? SCALE_WRITERS_PARTITION_COUNT + : nodePartitioningManager.getBucketNodeMap(session, handle).getBucketCount(); + return nodePartitioningManager.getPartitionFunction( + session, + scheme, + partitionChannelTypes, + IntStream.range(0, bucketCount).toArray()); + } + + public static SkewedPartitionRebalancer createSkewedPartitionRebalancer(int partitionCount, int taskCount, int taskPartitionedWriterCount, long minPartitionDataProcessedRebalanceThreshold) + { + // Keep the task bucket count to 50% of total local writers + int taskBucketCount = (int) ceil(0.5 * taskPartitionedWriterCount); + return new SkewedPartitionRebalancer(partitionCount, taskCount, taskBucketCount, minPartitionDataProcessedRebalanceThreshold); + } + + public static int getTaskCount(PartitioningScheme partitioningScheme) + { + // Todo: Handle skewness if there are more nodes/tasks than the buckets coming from connector + // https://github.com/trinodb/trino/issues/17254 + int[] bucketToPartition = partitioningScheme.getBucketToPartition() + .orElseThrow(() -> new IllegalArgumentException("Bucket to partition must be set before calculating taskCount")); + // Buckets can be greater than the actual partitions or tasks. Therefore, use max to find the actual taskCount. + return IntStream.of(bucketToPartition).max().getAsInt() + 1; + } + + private SkewedPartitionRebalancer( + int partitionCount, + int taskCount, + int taskBucketCount, + long minPartitionDataProcessedRebalanceThreshold) + { + this.partitionCount = partitionCount; + this.taskCount = taskCount; + this.taskBucketCount = taskBucketCount; + this.minPartitionDataProcessedRebalanceThreshold = minPartitionDataProcessedRebalanceThreshold; + this.minDataProcessedRebalanceThreshold = max(minPartitionDataProcessedRebalanceThreshold, MIN_DATA_PROCESSED_REBALANCE_THRESHOLD); + + this.partitionRowCount = new AtomicLongArray(partitionCount); + this.dataProcessed = new AtomicLong(); + this.dataProcessedAtLastRebalance = new AtomicLong(); + + this.partitionDataSizeAtLastRebalance = new long[partitionCount]; + this.partitionDataSizeSinceLastRebalancePerTask = new long[partitionCount]; + this.estimatedTaskBucketDataSizeSinceLastRebalance = new long[taskCount * taskBucketCount]; + + int[] taskBucketIds = new int[taskCount]; + ImmutableList.Builder> partitionAssignments = ImmutableList.builder(); + for (int partition = 0; partition < partitionCount; partition++) { + int taskId = partition % taskCount; + int bucketId = taskBucketIds[taskId]++ % taskBucketCount; + partitionAssignments.add(new CopyOnWriteArrayList<>(ImmutableList.of(new TaskBucket(taskId, bucketId)))); + } + this.partitionAssignments = partitionAssignments.build(); + } + + @VisibleForTesting + List> getPartitionAssignments() + { + ImmutableList.Builder> assignedTasks = ImmutableList.builder(); + for (List partitionAssignment : partitionAssignments) { + List tasks = partitionAssignment.stream() + .map(taskBucket -> taskBucket.taskId) + .collect(toImmutableList()); + assignedTasks.add(tasks); + } + return assignedTasks.build(); + } + + public int getTaskCount() + { + return taskCount; + } + + public int getTaskId(int partitionId, long index) + { + List taskIds = partitionAssignments.get(partitionId); + return taskIds.get(floorMod(index, taskIds.size())).taskId; + } + + public void addDataProcessed(long dataSize) + { + dataProcessed.addAndGet(dataSize); + } + + public void addPartitionRowCount(int partition, long rowCount) + { + partitionRowCount.addAndGet(partition, rowCount); + } + + public void rebalance() + { + long currentDataProcessed = dataProcessed.get(); + if (shouldRebalance(currentDataProcessed)) { + rebalancePartitions(currentDataProcessed); + } + } + + private boolean shouldRebalance(long dataProcessed) + { + // Rebalance only when total bytes processed since last rebalance is greater than rebalance threshold + return (dataProcessed - dataProcessedAtLastRebalance.get()) >= minDataProcessedRebalanceThreshold; + } + + private synchronized void rebalancePartitions(long dataProcessed) + { + if (!shouldRebalance(dataProcessed)) { + return; + } + + long[] partitionDataSize = calculatePartitionDataSize(dataProcessed); + + // initialize partitionDataSizeSinceLastRebalancePerTask + for (int partition = 0; partition < partitionCount; partition++) { + int totalAssignedTasks = partitionAssignments.get(partition).size(); + partitionDataSizeSinceLastRebalancePerTask[partition] = + (partitionDataSize[partition] - partitionDataSizeAtLastRebalance[partition]) / totalAssignedTasks; + } + + // Initialize taskBucketMaxPartitions + List> taskBucketMaxPartitions = new ArrayList<>(taskCount * taskBucketCount); + for (int taskId = 0; taskId < taskCount; taskId++) { + for (int bucketId = 0; bucketId < taskBucketCount; bucketId++) { + taskBucketMaxPartitions.add(new IndexedPriorityQueue<>()); + } + } + + for (int partition = 0; partition < partitionCount; partition++) { + List taskAssignments = partitionAssignments.get(partition); + for (TaskBucket taskBucket : taskAssignments) { + IndexedPriorityQueue queue = taskBucketMaxPartitions.get(taskBucket.id); + queue.addOrUpdate(partition, partitionDataSizeSinceLastRebalancePerTask[partition]); + } + } + + // Initialize maxTaskBuckets and minTaskBuckets + IndexedPriorityQueue maxTaskBuckets = new IndexedPriorityQueue<>(); + IndexedPriorityQueue minTaskBuckets = new IndexedPriorityQueue<>(); + for (int taskId = 0; taskId < taskCount; taskId++) { + for (int bucketId = 0; bucketId < taskBucketCount; bucketId++) { + TaskBucket taskBucket = new TaskBucket(taskId, bucketId); + estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id] = + calculateTaskBucketDataSizeSinceLastRebalance(taskBucketMaxPartitions.get(taskBucket.id)); + maxTaskBuckets.addOrUpdate(taskBucket, estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id]); + minTaskBuckets.addOrUpdate(taskBucket, Long.MAX_VALUE - estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id]); + } + } + + rebalanceBasedOnTaskBucketSkewness(maxTaskBuckets, minTaskBuckets, taskBucketMaxPartitions, partitionDataSize); + dataProcessedAtLastRebalance.set(dataProcessed); + } + + private long[] calculatePartitionDataSize(long dataProcessed) + { + long totalPartitionRowCount = 0; + for (int partition = 0; partition < partitionCount; partition++) { + totalPartitionRowCount += partitionRowCount.get(partition); + } + + long[] partitionDataSize = new long[partitionCount]; + for (int partition = 0; partition < partitionCount; partition++) { + partitionDataSize[partition] = (partitionRowCount.get(partition) * dataProcessed) / totalPartitionRowCount; + } + + return partitionDataSize; + } + + private long calculateTaskBucketDataSizeSinceLastRebalance(IndexedPriorityQueue maxPartitions) + { + long estimatedDataSizeSinceLastRebalance = 0; + for (int partition : maxPartitions) { + estimatedDataSizeSinceLastRebalance += partitionDataSizeSinceLastRebalancePerTask[partition]; + } + return estimatedDataSizeSinceLastRebalance; + } + + private void rebalanceBasedOnTaskBucketSkewness( + IndexedPriorityQueue maxTaskBuckets, + IndexedPriorityQueue minTaskBuckets, + List> taskBucketMaxPartitions, + long[] partitionDataSize) + { + while (true) { + TaskBucket maxTaskBucket = maxTaskBuckets.poll(); + if (maxTaskBucket == null) { + break; + } + + IndexedPriorityQueue maxPartitions = taskBucketMaxPartitions.get(maxTaskBucket.id); + if (maxPartitions.isEmpty()) { + continue; + } + + List minSkewedTaskBuckets = findSkewedMinTaskBuckets(maxTaskBucket, minTaskBuckets); + if (minSkewedTaskBuckets.isEmpty()) { + break; + } + + while (true) { + Integer maxPartition = maxPartitions.poll(); + if (maxPartition == null) { + break; + } + + if (partitionDataSizeSinceLastRebalancePerTask[maxPartition] >= minPartitionDataProcessedRebalanceThreshold) { + for (TaskBucket minTaskBucket : minSkewedTaskBuckets) { + if (rebalancePartition(maxPartition, minTaskBucket, maxTaskBuckets, minTaskBuckets, partitionDataSize[maxPartition])) { + break; + } + } + } + else { + break; + } + } + } + } + + private List findSkewedMinTaskBuckets(TaskBucket maxTaskBucket, IndexedPriorityQueue minTaskBuckets) + { + ImmutableList.Builder minSkewedTaskBuckets = ImmutableList.builder(); + for (TaskBucket minTaskBucket : minTaskBuckets) { + double skewness = + ((double) (estimatedTaskBucketDataSizeSinceLastRebalance[maxTaskBucket.id] + - estimatedTaskBucketDataSizeSinceLastRebalance[minTaskBucket.id])) + / estimatedTaskBucketDataSizeSinceLastRebalance[maxTaskBucket.id]; + if (skewness <= TASK_BUCKET_SKEWNESS_THRESHOLD || isNaN(skewness)) { + break; + } + if (maxTaskBucket.taskId != minTaskBucket.taskId) { + minSkewedTaskBuckets.add(minTaskBucket); + } + } + + return minSkewedTaskBuckets.build(); + } + + private boolean rebalancePartition( + int partitionId, + TaskBucket toTaskBucket, + IndexedPriorityQueue maxTasks, + IndexedPriorityQueue minTasks, + long partitionDataSize) + { + List assignments = partitionAssignments.get(partitionId); + if (assignments.stream().anyMatch(taskBucket -> taskBucket.taskId == toTaskBucket.taskId)) { + return false; + } + assignments.add(toTaskBucket); + + // Update the value of partitionDataSizeAtLastRebalance which will get used to calculate + // partitionDataSizeSinceLastRebalancePerTask in the next rebalancing cycle. + partitionDataSizeAtLastRebalance[partitionId] = partitionDataSize; + + int newTaskCount = assignments.size(); + int oldTaskCount = newTaskCount - 1; + // Since a partition is rebalanced from max to min skewed taskBucket, decrease the priority of max + // taskBucket as well as increase the priority of min taskBucket. + for (TaskBucket taskBucket : assignments) { + if (taskBucket.equals(toTaskBucket)) { + estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id] += + (partitionDataSizeSinceLastRebalancePerTask[partitionId] * oldTaskCount) / newTaskCount; + } + else { + estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id] -= + partitionDataSizeSinceLastRebalancePerTask[partitionId] / newTaskCount; + } + + maxTasks.addOrUpdate(taskBucket, estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id]); + minTasks.addOrUpdate(taskBucket, Long.MAX_VALUE - estimatedTaskBucketDataSizeSinceLastRebalance[taskBucket.id]); + } + + log.warn("Rebalanced partition %s to task %s with taskCount %s", partitionId, toTaskBucket.taskId, assignments.size()); + return true; + } + + private final class TaskBucket + { + private final int taskId; + private final int id; + + private TaskBucket(int taskId, int bucketId) + { + this.taskId = taskId; + // Unique id for this task and bucket + this.id = (taskId * taskBucketCount) + bucketId; + } + + @Override + public int hashCode() + { + return Objects.hash(id, id); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskBucket that = (TaskBucket) o; + return that.id == id; + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 9687da091827..191c4edc9c19 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -127,6 +127,7 @@ import io.trino.operator.join.unspilled.HashBuilderOperator; import io.trino.operator.output.PartitionedOutputOperator.PartitionedOutputFactory; import io.trino.operator.output.PositionsAppenderFactory; +import io.trino.operator.output.SkewedPartitionRebalancer; import io.trino.operator.output.TaskOutputOperator.TaskOutputFactory; import io.trino.operator.project.CursorProcessor; import io.trino.operator.project.PageProcessor; @@ -328,6 +329,10 @@ import static io.trino.operator.join.JoinUtils.isBuildSideReplicated; import static io.trino.operator.join.NestedLoopBuildOperator.NestedLoopBuildOperatorFactory; import static io.trino.operator.join.NestedLoopJoinOperator.NestedLoopJoinOperatorFactory; +import static io.trino.operator.output.SkewedPartitionRebalancer.checkCanScalePartitionsRemotely; +import static io.trino.operator.output.SkewedPartitionRebalancer.createPartitionFunction; +import static io.trino.operator.output.SkewedPartitionRebalancer.createSkewedPartitionRebalancer; +import static io.trino.operator.output.SkewedPartitionRebalancer.getTaskCount; import static io.trino.operator.window.pattern.PhysicalValuePointer.CLASSIFIER; import static io.trino.operator.window.pattern.PhysicalValuePointer.MATCH_NUMBER; import static io.trino.spi.StandardErrorCode.COMPILER_ERROR; @@ -566,7 +571,20 @@ public LocalExecutionPlan plan( .collect(toImmutableList()); } - PartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(taskContext.getSession(), partitioningScheme, partitionChannelTypes); + PartitionFunction partitionFunction; + Optional skewedPartitionRebalancer = Optional.empty(); + int taskCount = getTaskCount(partitioningScheme); + if (checkCanScalePartitionsRemotely(taskContext.getSession(), taskCount, partitioningScheme.getPartitioning().getHandle(), nodePartitioningManager)) { + partitionFunction = createPartitionFunction(taskContext.getSession(), nodePartitioningManager, partitioningScheme, partitionChannelTypes); + skewedPartitionRebalancer = Optional.of(createSkewedPartitionRebalancer( + partitionFunction.getPartitionCount(), + taskCount, + getTaskPartitionedWriterCount(taskContext.getSession()), + getWriterMinSize(taskContext.getSession()).toBytes())); + } + else { + partitionFunction = nodePartitioningManager.getPartitionFunction(taskContext.getSession(), partitioningScheme, partitionChannelTypes); + } OptionalInt nullChannel = OptionalInt.empty(); Set partitioningColumns = partitioningScheme.getPartitioning().getColumns(); @@ -593,7 +611,8 @@ public LocalExecutionPlan plan( positionsAppenderFactory, taskContext.getSession().getExchangeEncryptionKey(), taskContext.newAggregateMemoryContext(), - getPagePartitioningBufferPoolSize(taskContext.getSession()))); + getPagePartitioningBufferPoolSize(taskContext.getSession()), + skewedPartitionRebalancer)); } public LocalExecutionPlan plan( @@ -3266,7 +3285,7 @@ public PhysicalOperation visitRefreshMaterializedView(RefreshMaterializedViewNod public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { // Set table writer count - int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getPreferredPartitioningScheme(), node.getSource()); + int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource()); context.setDriverInstanceCount(maxWriterCount); context.taskContext.setMaxWriterCount(maxWriterCount); @@ -3424,7 +3443,7 @@ public PhysicalOperation visitSimpleTableExecuteNode(SimpleTableExecuteNode node public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecutionPlanContext context) { // Set table writer count - int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getPreferredPartitioningScheme(), node.getSource()); + int maxWriterCount = getWriterCount(session, node.getPartitioningScheme(), node.getSource()); context.setDriverInstanceCount(maxWriterCount); context.taskContext.setMaxWriterCount(maxWriterCount); @@ -3451,7 +3470,7 @@ public PhysicalOperation visitTableExecute(TableExecuteNode node, LocalExecution return new PhysicalOperation(operatorFactory, outputMapping.buildOrThrow(), context, source); } - private int getWriterCount(Session session, Optional partitioningScheme, Optional preferredPartitioningScheme, PlanNode source) + private int getWriterCount(Session session, Optional partitioningScheme, PlanNode source) { // This check is required because we don't know which writer count to use when exchange is // single distribution. It could be possible that when scaling is enabled, a single distribution is @@ -3461,18 +3480,12 @@ private int getWriterCount(Session session, Optional partiti return 1; } - if (isLocalScaledWriterExchange(source)) { - return partitioningScheme.or(() -> preferredPartitioningScheme) - // The default value of partitioned writer count is 32 which is high enough to use it - // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many - // small files since when scaling is disabled only single writer will handle a single partition. - .map(scheme -> getTaskPartitionedWriterCount(session)) - .orElseGet(() -> getTaskScaleWritersMaxWriterCount(session)); - } - + // The default value of partitioned writer count is 32 which is high enough to use it + // for both cases when scaling is enabled or not. Additionally, it doesn't lead to too many + // small files since when scaling is disabled only single writer will handle a single partition. return partitioningScheme .map(scheme -> getTaskPartitionedWriterCount(session)) - .orElseGet(() -> getTaskWriterCount(session)); + .orElseGet(() -> isLocalScaledWriterExchange(source) ? getTaskScaleWritersMaxWriterCount(session) : getTaskWriterCount(session)); } private boolean isSingleGatheringExchange(PlanNode node) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java index f78ce7aa1bcd..7b9fe1549a76 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LogicalPlanner.java @@ -130,6 +130,7 @@ import static io.trino.SystemSessionProperties.getMaxWriterTaskCount; import static io.trino.SystemSessionProperties.getRetryPolicy; import static io.trino.SystemSessionProperties.isCollectPlanStatisticsForAllQueries; +import static io.trino.SystemSessionProperties.isUsePreferredWritePartitioning; import static io.trino.metadata.MetadataUtil.createQualifiedObjectName; import static io.trino.spi.StandardErrorCode.CATALOG_NOT_FOUND; import static io.trino.spi.StandardErrorCode.CONSTRAINT_VIOLATION; @@ -660,7 +661,6 @@ private RelationPlan createTableWriterPlan( TableStatisticsMetadata statisticsMetadata) { Optional partitioningScheme = Optional.empty(); - Optional preferredPartitioningScheme = Optional.empty(); int maxWriterTasks = target.getMaxWriterTasks(plannerContext.getMetadata(), session).orElse(getMaxWriterTaskCount(session)); Optional maxWritersNodesCount = getRetryPolicy(session) != RetryPolicy.TASK @@ -683,9 +683,9 @@ private RelationPlan createTableWriterPlan( Partitioning.create(partitioningHandle.get(), partitionFunctionArguments), outputLayout)); } - else { + else if (isUsePreferredWritePartitioning(session)) { // empty connector partitioning handle means evenly partitioning on partitioning columns - preferredPartitioningScheme = Optional.of(new PartitioningScheme( + partitioningScheme = Optional.of(new PartitioningScheme( Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments), outputLayout, Optional.empty(), @@ -721,7 +721,6 @@ private RelationPlan createTableWriterPlan( symbols, columnNames, partitioningScheme, - preferredPartitioningScheme, Optional.of(partialAggregation), Optional.of(result.getDescriptor().map(aggregations.getMappings()::get))), target, @@ -743,7 +742,6 @@ private RelationPlan createTableWriterPlan( symbols, columnNames, partitioningScheme, - preferredPartitioningScheme, Optional.empty(), Optional.empty()), target, @@ -956,7 +954,6 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat // todo extract common method to be used here and in createTableWriterPlan() Optional partitioningScheme = Optional.empty(); - Optional preferredPartitioningScheme = Optional.empty(); if (layout.isPresent()) { List partitionFunctionArguments = new ArrayList<>(); layout.get().getPartitionColumns().stream() @@ -973,13 +970,13 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat Partitioning.create(partitioningHandle.get(), partitionFunctionArguments), outputLayout)); } - else { + else if (isUsePreferredWritePartitioning(session)) { // empty connector partitioning handle means evenly partitioning on partitioning columns int maxWriterTasks = tableExecuteTarget.getMaxWriterTasks(plannerContext.getMetadata(), session).orElse(getMaxWriterTaskCount(session)); Optional maxWritersNodesCount = getRetryPolicy(session) != RetryPolicy.TASK ? Optional.of(Math.min(maxWriterTasks, getMaxWriterTaskCount(session))) : Optional.empty(); - preferredPartitioningScheme = Optional.of(new PartitioningScheme( + partitioningScheme = Optional.of(new PartitioningScheme( Partitioning.create(FIXED_HASH_DISTRIBUTION, partitionFunctionArguments), outputLayout, Optional.empty(), @@ -1000,8 +997,7 @@ private RelationPlan createTableExecutePlan(Analysis analysis, TableExecute stat symbolAllocator.newSymbol("fragment", VARBINARY), symbols, columnNames, - partitioningScheme, - preferredPartitioningScheme), + partitioningScheme), tableExecuteTarget, symbolAllocator.newSymbol("rows", BIGINT), Optional.empty(), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java index 8e3640e6b63d..52aa09da8ce5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java @@ -30,7 +30,7 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MINUTES; -@DefunctConfig("adaptive-partial-aggregation.min-rows") +@DefunctConfig({"adaptive-partial-aggregation.min-rows", "preferred-write-partitioning-min-number-of-partitions"}) public class OptimizerConfig { private double cpuCostWeight = 75; @@ -57,7 +57,6 @@ public class OptimizerConfig private boolean distributedSort = true; private boolean usePreferredWritePartitioning = true; - private int preferredWritePartitioningMinNumberOfPartitions = 50; private Duration iterativeOptimizerTimeout = new Duration(3, MINUTES); // by default let optimizer wait a long time in case it retrieves some data from ConnectorMetadata @@ -373,20 +372,6 @@ public OptimizerConfig setUsePreferredWritePartitioning(boolean usePreferredWrit return this; } - @Min(1) - public int getPreferredWritePartitioningMinNumberOfPartitions() - { - return preferredWritePartitioningMinNumberOfPartitions; - } - - @Config("preferred-write-partitioning-min-number-of-partitions") - @ConfigDescription("Use preferred write partitioning when the number of written partitions exceeds the configured threshold") - public OptimizerConfig setPreferredWritePartitioningMinNumberOfPartitions(int preferredWritePartitioningMinNumberOfPartitions) - { - this.preferredWritePartitioningMinNumberOfPartitions = preferredWritePartitioningMinNumberOfPartitions; - return this; - } - public Duration getIterativeOptimizerTimeout() { return iterativeOptimizerTimeout; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java index ca534c4a696c..bb80d571f551 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java @@ -34,8 +34,6 @@ import io.trino.sql.planner.iterative.rule.AddDynamicFilterSource; import io.trino.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet; import io.trino.sql.planner.iterative.rule.AddIntermediateAggregations; -import io.trino.sql.planner.iterative.rule.ApplyPreferredTableExecutePartitioning; -import io.trino.sql.planner.iterative.rule.ApplyPreferredTableWriterPartitioning; import io.trino.sql.planner.iterative.rule.ApplyTableScanRedirection; import io.trino.sql.planner.iterative.rule.ArraySortAfterArrayDistinct; import io.trino.sql.planner.iterative.rule.CanonicalizeExpressions; @@ -757,16 +755,6 @@ public PlanOptimizers( statsCalculator, costCalculator, ImmutableSet.of(new RemoveRedundantIdentityProjections())), - // Prefer write partitioning rule requires accurate stats. - // Run it before reorder joins which also depends on accurate stats. - new IterativeOptimizer( - plannerContext, - ruleStats, - statsCalculator, - costCalculator, - ImmutableSet.of( - new ApplyPreferredTableWriterPartitioning(), - new ApplyPreferredTableExecutePartitioning())), // Because ReorderJoins runs only once, // PredicatePushDown, columnPruningOptimizer and RemoveRedundantIdentityProjections // need to run beforehand in order to produce an optimal join order diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableExecutePartitioning.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableExecutePartitioning.java deleted file mode 100644 index f65d7be2f1ca..000000000000 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableExecutePartitioning.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.sql.planner.iterative.rule; - -import io.trino.Session; -import io.trino.matching.Captures; -import io.trino.matching.Pattern; -import io.trino.operator.RetryPolicy; -import io.trino.sql.planner.iterative.Rule; -import io.trino.sql.planner.plan.TableExecuteNode; - -import java.util.Optional; - -import static io.trino.SystemSessionProperties.getPreferredWritePartitioningMinNumberOfPartitions; -import static io.trino.SystemSessionProperties.getRetryPolicy; -import static io.trino.SystemSessionProperties.isFaultTolerantExecutionForcePreferredWritePartitioningEnabled; -import static io.trino.SystemSessionProperties.isUsePreferredWritePartitioning; -import static io.trino.cost.AggregationStatsRule.getRowsCount; -import static io.trino.sql.planner.plan.Patterns.tableExecute; -import static java.lang.Double.isNaN; - -/** - * Replaces {@link TableExecuteNode} with {@link TableExecuteNode#getPreferredPartitioningScheme()} - * with a {@link TableExecuteNode} with {@link TableExecuteNode#getPartitioningScheme()} set. - */ -public class ApplyPreferredTableExecutePartitioning - implements Rule -{ - public static final Pattern TABLE_EXECUTE_NODE_WITH_PREFERRED_PARTITIONING = tableExecute() - .matching(node -> node.getPreferredPartitioningScheme().isPresent()); - - @Override - public Pattern getPattern() - { - return TABLE_EXECUTE_NODE_WITH_PREFERRED_PARTITIONING; - } - - @Override - public boolean isEnabled(Session session) - { - return isUsePreferredWritePartitioning(session); - } - - @Override - public Result apply(TableExecuteNode node, Captures captures, Context context) - { - if (getRetryPolicy(context.getSession()) == RetryPolicy.TASK && isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(context.getSession())) { - // Choosing preferred partitioning introduces a risk of running into a skew (for example when writing to only a single partition). - // Fault tolerant execution can detect a potential skew automatically (based on runtime statistics) and mitigate it by splitting skewed partitions. - return enable(node); - } - - int minimumNumberOfPartitions = getPreferredWritePartitioningMinNumberOfPartitions(context.getSession()); - if (minimumNumberOfPartitions <= 1) { - return enable(node); - } - - double expectedNumberOfPartitions = getRowsCount( - context.getStatsProvider().getStats(node.getSource()), - node.getPreferredPartitioningScheme().get().getPartitioning().getColumns()); - // Disable preferred partitioning at remote exchange level if stats are absent or estimated number of partitions - // are less than minimumNumberOfPartitions. This is because at remote exchange we don't have scaling to - // mitigate skewness. - // TODO - Remove this check after implementing skewness mitigation at remote exchange - https://github.com/trinodb/trino/issues/16178 - if (isNaN(expectedNumberOfPartitions) || (expectedNumberOfPartitions < minimumNumberOfPartitions)) { - return Result.empty(); - } - - return enable(node); - } - - private static Result enable(TableExecuteNode node) - { - return Result.ofPlanNode(new TableExecuteNode( - node.getId(), - node.getSource(), - node.getTarget(), - node.getRowCountSymbol(), - node.getFragmentSymbol(), - node.getColumns(), - node.getColumnNames(), - node.getPreferredPartitioningScheme(), - Optional.empty())); - } -} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableWriterPartitioning.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableWriterPartitioning.java deleted file mode 100644 index 2ddcc492059d..000000000000 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ApplyPreferredTableWriterPartitioning.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.sql.planner.iterative.rule; - -import io.trino.Session; -import io.trino.matching.Captures; -import io.trino.matching.Pattern; -import io.trino.operator.RetryPolicy; -import io.trino.sql.planner.iterative.Rule; -import io.trino.sql.planner.plan.TableWriterNode; - -import java.util.Optional; - -import static io.trino.SystemSessionProperties.getPreferredWritePartitioningMinNumberOfPartitions; -import static io.trino.SystemSessionProperties.getRetryPolicy; -import static io.trino.SystemSessionProperties.isFaultTolerantExecutionForcePreferredWritePartitioningEnabled; -import static io.trino.SystemSessionProperties.isUsePreferredWritePartitioning; -import static io.trino.cost.AggregationStatsRule.getRowsCount; -import static io.trino.sql.planner.plan.Patterns.tableWriterNode; -import static java.lang.Double.isNaN; - -/** - * Rule verifies if preconditions for using preferred write partitioning are met: - * - expected number of partitions to be written (based on table stat) is greater - * than or equal to preferred_write_partitioning_min_number_of_partitions session property, - * - use_preferred_write_partitioning is set to true. - * - * If precondition are met the {@link TableWriterNode} is modified to mark the intention to use preferred write partitioning: - * value of {@link TableWriterNode#getPreferredPartitioningScheme()} is set as result of {@link TableWriterNode#getPartitioningScheme()}. - */ -public class ApplyPreferredTableWriterPartitioning - implements Rule -{ - public static final Pattern WRITER_NODE_WITH_PREFERRED_PARTITIONING = tableWriterNode() - .matching(node -> node.getPreferredPartitioningScheme().isPresent()); - - @Override - public Pattern getPattern() - { - return WRITER_NODE_WITH_PREFERRED_PARTITIONING; - } - - @Override - public boolean isEnabled(Session session) - { - return isUsePreferredWritePartitioning(session); - } - - @Override - public Result apply(TableWriterNode node, Captures captures, Context context) - { - if (getRetryPolicy(context.getSession()) == RetryPolicy.TASK && isFaultTolerantExecutionForcePreferredWritePartitioningEnabled(context.getSession())) { - // Choosing preferred partitioning introduces a risk of running into a skew (for example when writing to only a single partition). - // Fault tolerant execution can detect a potential skew automatically (based on runtime statistics) and mitigate it by splitting skewed partitions. - return enable(node); - } - - int minimumNumberOfPartitions = getPreferredWritePartitioningMinNumberOfPartitions(context.getSession()); - if (minimumNumberOfPartitions <= 1) { - return enable(node); - } - - double expectedNumberOfPartitions = getRowsCount( - context.getStatsProvider().getStats(node.getSource()), - node.getPreferredPartitioningScheme().get().getPartitioning().getColumns()); - // Disable preferred partitioning at remote exchange level if stats are absent or estimated number of partitions - // are less than minimumNumberOfPartitions. This is because at remote exchange we don't have scaling to - // mitigate skewness. - // TODO - Remove this check after implementing skewness mitigation at remote exchange - https://github.com/trinodb/trino/issues/16178 - if (isNaN(expectedNumberOfPartitions) || (expectedNumberOfPartitions < minimumNumberOfPartitions)) { - return Result.empty(); - } - - return enable(node); - } - - private Result enable(TableWriterNode node) - { - return Result.ofPlanNode(new TableWriterNode( - node.getId(), - node.getSource(), - node.getTarget(), - node.getRowCountSymbol(), - node.getFragmentSymbol(), - node.getColumns(), - node.getColumnNames(), - node.getPreferredPartitioningScheme(), - Optional.empty(), - node.getStatisticsAggregation(), - node.getStatisticsAggregationDescriptor())); - } -} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java index d99a9eb44d61..be475d120c7a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java @@ -689,7 +689,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, StreamPreferred return visitTableWriter( node, node.getPartitioningScheme(), - node.getPreferredPartitioningScheme(), node.getSource(), parentPreferences, node.getTarget()); @@ -701,7 +700,6 @@ public PlanWithProperties visitTableExecute(TableExecuteNode node, StreamPreferr return visitTableWriter( node, node.getPartitioningScheme(), - node.getPreferredPartitioningScheme(), node.getSource(), parentPreferences, node.getTarget()); @@ -710,7 +708,6 @@ public PlanWithProperties visitTableExecute(TableExecuteNode node, StreamPreferr private PlanWithProperties visitTableWriter( PlanNode node, Optional partitioningScheme, - Optional preferredPartitionScheme, PlanNode source, StreamPreferredProperties parentPreferences, WriterTarget writerTarget) @@ -718,8 +715,8 @@ private PlanWithProperties visitTableWriter( if (isTaskScaleWritersEnabled(session) && writerTarget.supportsReportingWrittenBytes(plannerContext.getMetadata(), session) && writerTarget.supportsMultipleWritersPerPartition(plannerContext.getMetadata(), session) - && (partitioningScheme.isPresent() || preferredPartitionScheme.isPresent())) { - return visitScalePartitionedWriter(node, partitioningScheme.orElseGet(preferredPartitionScheme::get), source); + && partitioningScheme.isPresent()) { + return visitScalePartitionedWriter(node, partitioningScheme.get(), source); } return partitioningScheme @@ -830,7 +827,7 @@ private PlanWithProperties visitScalePartitionedWriter(PlanNode node, Partitioni @Override public PlanWithProperties visitMergeWriter(MergeWriterNode node, StreamPreferredProperties parentPreferences) { - return visitTableWriter(node, node.getPartitioningScheme(), Optional.empty(), node.getSource(), parentPreferences, node.getTarget()); + return visitTableWriter(node, node.getPartitioningScheme(), node.getSource(), parentPreferences, node.getTarget()); } // diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java index 82445b9065b9..d73f4eab5c50 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java @@ -129,7 +129,6 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext map(partitioningScheme, source.getOutputSymbols())), - node.getPreferredPartitioningScheme().map(partitioningScheme -> map(partitioningScheme, source.getOutputSymbols())), node.getStatisticsAggregation().map(this::map), node.getStatisticsAggregationDescriptor().map(descriptor -> descriptor.map(this::map))); } @@ -529,8 +528,7 @@ public TableExecuteNode map(TableExecuteNode node, PlanNode source, PlanNodeId n map(node.getFragmentSymbol()), map(node.getColumns()), node.getColumnNames(), - node.getPartitioningScheme().map(partitioningScheme -> map(partitioningScheme, source.getOutputSymbols())), - node.getPreferredPartitioningScheme().map(partitioningScheme -> map(partitioningScheme, source.getOutputSymbols()))); + node.getPartitioningScheme().map(partitioningScheme -> map(partitioningScheme, source.getOutputSymbols()))); } public MergeWriterNode map(MergeWriterNode node, PlanNode source) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableExecuteNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableExecuteNode.java index f6b4877deb17..babf15a22414 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableExecuteNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableExecuteNode.java @@ -40,7 +40,6 @@ public class TableExecuteNode private final List columns; private final List columnNames; private final Optional partitioningScheme; - private final Optional preferredPartitioningScheme; private final List outputs; @JsonCreator @@ -52,8 +51,7 @@ public TableExecuteNode( @JsonProperty("fragmentSymbol") Symbol fragmentSymbol, @JsonProperty("columns") List columns, @JsonProperty("columnNames") List columnNames, - @JsonProperty("partitioningScheme") Optional partitioningScheme, - @JsonProperty("preferredPartitioningScheme") Optional preferredPartitioningScheme) + @JsonProperty("partitioningScheme") Optional partitioningScheme) { super(id); @@ -68,8 +66,6 @@ public TableExecuteNode( this.columns = ImmutableList.copyOf(columns); this.columnNames = ImmutableList.copyOf(columnNames); this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null"); - this.preferredPartitioningScheme = requireNonNull(preferredPartitioningScheme, "preferredPartitioningScheme is null"); - checkArgument(partitioningScheme.isEmpty() || preferredPartitioningScheme.isEmpty(), "Both partitioningScheme and preferredPartitioningScheme cannot be present"); ImmutableList.Builder outputs = ImmutableList.builder() .add(rowCountSymbol) @@ -119,12 +115,6 @@ public Optional getPartitioningScheme() return partitioningScheme; } - @JsonProperty - public Optional getPreferredPartitioningScheme() - { - return preferredPartitioningScheme; - } - @Override public List getSources() { @@ -154,7 +144,6 @@ public PlanNode replaceChildren(List newChildren) fragmentSymbol, columns, columnNames, - partitioningScheme, - preferredPartitioningScheme); + partitioningScheme); } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java index 3cdb2194e7c3..6f3d1793063d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java @@ -57,7 +57,6 @@ public class TableWriterNode private final List columns; private final List columnNames; private final Optional partitioningScheme; - private final Optional preferredPartitioningScheme; private final Optional statisticsAggregation; private final Optional> statisticsAggregationDescriptor; private final List outputs; @@ -72,7 +71,6 @@ public TableWriterNode( @JsonProperty("columns") List columns, @JsonProperty("columnNames") List columnNames, @JsonProperty("partitioningScheme") Optional partitioningScheme, - @JsonProperty("preferredPartitioningScheme") Optional preferredPartitioningScheme, @JsonProperty("statisticsAggregation") Optional statisticsAggregation, @JsonProperty("statisticsAggregationDescriptor") Optional> statisticsAggregationDescriptor) { @@ -89,11 +87,9 @@ public TableWriterNode( this.columns = ImmutableList.copyOf(columns); this.columnNames = ImmutableList.copyOf(columnNames); this.partitioningScheme = requireNonNull(partitioningScheme, "partitioningScheme is null"); - this.preferredPartitioningScheme = requireNonNull(preferredPartitioningScheme, "preferredPartitioningScheme is null"); this.statisticsAggregation = requireNonNull(statisticsAggregation, "statisticsAggregation is null"); this.statisticsAggregationDescriptor = requireNonNull(statisticsAggregationDescriptor, "statisticsAggregationDescriptor is null"); checkArgument(statisticsAggregation.isPresent() == statisticsAggregationDescriptor.isPresent(), "statisticsAggregation and statisticsAggregationDescriptor must be either present or absent"); - checkArgument(partitioningScheme.isEmpty() || preferredPartitioningScheme.isEmpty(), "Both partitioningScheme and preferredPartitioningScheme cannot be present"); ImmutableList.Builder outputs = ImmutableList.builder() .add(rowCountSymbol) @@ -147,12 +143,6 @@ public Optional getPartitioningScheme() return partitioningScheme; } - @JsonProperty - public Optional getPreferredPartitioningScheme() - { - return preferredPartitioningScheme; - } - @JsonProperty public Optional getStatisticsAggregation() { @@ -186,7 +176,7 @@ public R accept(PlanVisitor visitor, C context) @Override public PlanNode replaceChildren(List newChildren) { - return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, rowCountSymbol, fragmentSymbol, columns, columnNames, partitioningScheme, preferredPartitioningScheme, statisticsAggregation, statisticsAggregationDescriptor); + return new TableWriterNode(getId(), Iterables.getOnlyElement(newChildren), target, rowCountSymbol, fragmentSymbol, columns, columnNames, partitioningScheme, statisticsAggregation, statisticsAggregationDescriptor); } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "@type") diff --git a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java index 6ca859b272c8..327f09aa1c5f 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java @@ -51,7 +51,6 @@ public void testDefaults() .setColocatedJoinsEnabled(true) .setSpatialJoinsEnabled(true) .setUsePreferredWritePartitioning(true) - .setPreferredWritePartitioningMinNumberOfPartitions(50) .setEnableStatsCalculator(true) .setStatisticsPrecalculationForPushdownEnabled(true) .setCollectPlanStatisticsForAllQueries(false) @@ -119,7 +118,6 @@ public void testExplicitPropertyMappings() .put("spatial-joins-enabled", "false") .put("distributed-sort", "false") .put("use-preferred-write-partitioning", "false") - .put("preferred-write-partitioning-min-number-of-partitions", "10") .put("optimizer.optimize-metadata-queries", "true") .put("optimizer.optimize-hash-generation", "false") .put("optimizer.optimize-mixed-distinct-aggregations", "true") @@ -170,7 +168,6 @@ public void testExplicitPropertyMappings() .setColocatedJoinsEnabled(false) .setSpatialJoinsEnabled(false) .setUsePreferredWritePartitioning(false) - .setPreferredWritePartitioningMinNumberOfPartitions(10) .setDefaultFilterFactorEnabled(true) .setFilterConjunctionIndependenceFactor(1.0) .setNonEstimatablePredicateApproximationEnabled(false) diff --git a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java index 3c5615bc7f71..62f3b899068d 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/BenchmarkPartitionedOutputOperator.java @@ -467,7 +467,8 @@ private PartitionedOutputOperator createPartitionedOutputOperator() POSITIONS_APPENDER_FACTORY, Optional.empty(), newSimpleAggregatedMemoryContext(), - 0); + 0, + Optional.empty()); return (PartitionedOutputOperator) operatorFactory .createOutputOperator(0, new PlanNodeId("plan-node-0"), types, Function.identity(), serdeFactory) .createOperator(createDriverContext()); diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java index c30bc067358b..491949cbf6fe 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitioner.java @@ -634,7 +634,8 @@ public PartitionedOutputOperator buildPartitionedOutputOperator() POSITIONS_APPENDER_FACTORY, Optional.empty(), memoryContext, - 1); + 1, + Optional.empty()); OperatorFactory factory = operatorFactory.createOutputOperator(0, new PlanNodeId("plan-node-0"), types, Function.identity(), PAGES_SERDE_FACTORY); PartitionedOutputOperator operator = (PartitionedOutputOperator) factory .createOperator(driverContext); @@ -660,7 +661,8 @@ public PagePartitioner build() PARTITION_MAX_MEMORY, POSITIONS_APPENDER_FACTORY, Optional.empty(), - memoryContext); + memoryContext, + true); pagePartitioner.setupOperator(operatorContext); return pagePartitioner; diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitionerPool.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitionerPool.java index 032d84a0dbb6..cde9cfa15d98 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitionerPool.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPagePartitionerPool.java @@ -176,7 +176,8 @@ private static PartitionedOutputOperatorFactory createFactory(DataSize maxPagePa new PositionsAppenderFactory(new BlockTypeOperators()), Optional.empty(), memoryContext, - 2); + 2, + Optional.empty()); } private long processSplitsConcurrently(PartitionedOutputOperatorFactory factory, AggregatedMemoryContext memoryContext, Page... splits) diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java b/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java new file mode 100644 index 000000000000..176bf7f0e2f2 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestSkewedPartitionRebalancer.java @@ -0,0 +1,206 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import io.trino.SequencePageBuilder; +import io.trino.operator.PartitionFunction; +import io.trino.spi.Page; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.List; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.operator.output.SkewedPartitionRebalancer.createSkewedPartitionRebalancer; +import static io.trino.spi.type.BigintType.BIGINT; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestSkewedPartitionRebalancer +{ + private static final long MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD = DataSize.of(1, MEGABYTE).toBytes(); + + @Test + public void testRebalanceWithSkewness() + { + int partitionCount = 3; + SkewedPartitionRebalancer rebalancer = createSkewedPartitionRebalancer(partitionCount, 3, 6, MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD); + SkewedPartitionFunction function = new SkewedPartitionFunction(new TestPartitionFunction(partitionCount), rebalancer); + + rebalancer.addPartitionRowCount(0, 1000); + rebalancer.addPartitionRowCount(1, 1000); + rebalancer.addPartitionRowCount(2, 1000); + rebalancer.addDataProcessed(DataSize.of(40, MEGABYTE).toBytes()); + // No rebalancing will happen since data processed is less than 50MB limit + rebalancer.rebalance(); + + assertThat(getPartitionPositions(function, 17)) + .containsExactly( + new IntArrayList(ImmutableList.of(0, 3, 6, 9, 12, 15)), + new IntArrayList(ImmutableList.of(1, 4, 7, 10, 13, 16)), + new IntArrayList(ImmutableList.of(2, 5, 8, 11, 14))); + assertThat(rebalancer.getPartitionAssignments()) + .containsExactly(ImmutableList.of(0), ImmutableList.of(1), ImmutableList.of(2)); + + rebalancer.addPartitionRowCount(0, 1000); + rebalancer.addPartitionRowCount(1, 1000); + rebalancer.addPartitionRowCount(2, 1000); + rebalancer.addDataProcessed(DataSize.of(20, MEGABYTE).toBytes()); + // Rebalancing will happen since we crossed the data processed limit. + // Part0 -> Task1 (Bucket1), Part1 -> Task0 (Bucket1), Part2 -> Task0 (Bucket2) + rebalancer.rebalance(); + + assertThat(getPartitionPositions(function, 17)) + .containsExactly( + new IntArrayList(ImmutableList.of(0, 2, 4, 6, 8, 10, 12, 14, 16)), + new IntArrayList(ImmutableList.of(1, 3, 7, 9, 13, 15)), + new IntArrayList(ImmutableList.of(5, 11))); + assertThat(rebalancer.getPartitionAssignments()) + .containsExactly(ImmutableList.of(0, 1), ImmutableList.of(1, 0), ImmutableList.of(2, 0)); + + rebalancer.addPartitionRowCount(0, 1000); + rebalancer.addPartitionRowCount(1, 1000); + rebalancer.addPartitionRowCount(2, 1000); + rebalancer.addDataProcessed(DataSize.of(200, MEGABYTE).toBytes()); + // Rebalancing will happen + // Part0 -> Task2 (Bucket1), Part1 -> Task2 (Bucket2), Part2 -> Task1 (Bucket2) + rebalancer.rebalance(); + + assertThat(getPartitionPositions(function, 17)) + .containsExactly( + new IntArrayList(ImmutableList.of(0, 2, 4, 9, 11, 13)), + new IntArrayList(ImmutableList.of(1, 3, 5, 10, 12, 14)), + new IntArrayList(ImmutableList.of(6, 7, 8, 15, 16))); + assertThat(rebalancer.getPartitionAssignments()) + .containsExactly(ImmutableList.of(0, 1, 2), ImmutableList.of(1, 0, 2), ImmutableList.of(2, 0, 1)); + } + + @Test + public void testRebalanceWithoutSkewness() + { + int partitionCount = 6; + SkewedPartitionRebalancer rebalancer = createSkewedPartitionRebalancer(partitionCount, 3, 4, MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD); + SkewedPartitionFunction function = new SkewedPartitionFunction(new TestPartitionFunction(partitionCount), rebalancer); + + rebalancer.addPartitionRowCount(0, 1000); + rebalancer.addPartitionRowCount(1, 700); + rebalancer.addPartitionRowCount(2, 600); + rebalancer.addPartitionRowCount(3, 1000); + rebalancer.addPartitionRowCount(4, 700); + rebalancer.addPartitionRowCount(5, 600); + rebalancer.addDataProcessed(DataSize.of(500, MEGABYTE).toBytes()); + // No rebalancing will happen since there is no skewness across task buckets + rebalancer.rebalance(); + + assertThat(getPartitionPositions(function, 6)) + .containsExactly( + new IntArrayList(ImmutableList.of(0, 3)), + new IntArrayList(ImmutableList.of(1, 4)), + new IntArrayList(ImmutableList.of(2, 5))); + assertThat(rebalancer.getPartitionAssignments()) + .containsExactly(ImmutableList.of(0), ImmutableList.of(1), ImmutableList.of(2), ImmutableList.of(0), ImmutableList.of(1), ImmutableList.of(2)); + } + + @Test + public void testNoRebalanceWhenDataWrittenIsLessThanTheRebalanceLimit() + { + int partitionCount = 3; + SkewedPartitionRebalancer rebalancer = createSkewedPartitionRebalancer(partitionCount, 3, 6, MIN_PARTITION_DATA_PROCESSED_REBALANCE_THRESHOLD); + SkewedPartitionFunction function = new SkewedPartitionFunction(new TestPartitionFunction(partitionCount), rebalancer); + + rebalancer.addPartitionRowCount(0, 1000); + rebalancer.addPartitionRowCount(1, 0); + rebalancer.addPartitionRowCount(2, 0); + rebalancer.addDataProcessed(DataSize.of(40, MEGABYTE).toBytes()); + // No rebalancing will happen since we do not cross the max data processed limit of 50MB + rebalancer.rebalance(); + + assertThat(getPartitionPositions(function, 6)) + .containsExactly( + new IntArrayList(ImmutableList.of(0, 3)), + new IntArrayList(ImmutableList.of(1, 4)), + new IntArrayList(ImmutableList.of(2, 5))); + assertThat(rebalancer.getPartitionAssignments()) + .containsExactly(ImmutableList.of(0), ImmutableList.of(1), ImmutableList.of(2)); + } + + @Test + public void testNoRebalanceWhenDataWrittenByThePartitionIsLessThanWriterMinSize() + { + int partitionCount = 3; + long minPartitionDataProcessedRebalanceThreshold = DataSize.of(50, MEGABYTE).toBytes(); + SkewedPartitionRebalancer rebalancer = createSkewedPartitionRebalancer(partitionCount, 3, 6, minPartitionDataProcessedRebalanceThreshold); + SkewedPartitionFunction function = new SkewedPartitionFunction(new TestPartitionFunction(partitionCount), rebalancer); + + rebalancer.addPartitionRowCount(0, 1000); + rebalancer.addPartitionRowCount(1, 600); + rebalancer.addPartitionRowCount(2, 0); + rebalancer.addDataProcessed(DataSize.of(60, MEGABYTE).toBytes()); + // No rebalancing will happen since no partition has crossed the writerMinSize limit of 50MB + rebalancer.rebalance(); + + assertThat(getPartitionPositions(function, 6)) + .containsExactly( + new IntArrayList(ImmutableList.of(0, 3)), + new IntArrayList(ImmutableList.of(1, 4)), + new IntArrayList(ImmutableList.of(2, 5))); + assertThat(rebalancer.getPartitionAssignments()) + .containsExactly(ImmutableList.of(0), ImmutableList.of(1), ImmutableList.of(2)); + } + + private List> getPartitionPositions(PartitionFunction function, int maxPosition) + { + List> partitionPositions = new ArrayList<>(); + for (int partition = 0; partition < function.getPartitionCount(); partition++) { + partitionPositions.add(new ArrayList<>()); + } + + for (int position = 0; position < maxPosition; position++) { + int partition = function.getPartition(dummyPage(), position); + partitionPositions.get(partition).add(position); + } + + return partitionPositions; + } + + private static Page dummyPage() + { + return SequencePageBuilder.createSequencePage(ImmutableList.of(BIGINT), 100, 0); + } + + private static class TestPartitionFunction + implements PartitionFunction + { + private final int partitionCount; + + private TestPartitionFunction(int partitionCount) + { + this.partitionCount = partitionCount; + } + + @Override + public int getPartitionCount() + { + return partitionCount; + } + + @Override + public int getPartition(Page page, int position) + { + return position % partitionCount; + } + } +} diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestInsert.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestInsert.java index 75ba685a9c8b..2edb528051e4 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestInsert.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestInsert.java @@ -36,7 +36,7 @@ import java.util.Optional; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; +import static io.trino.SystemSessionProperties.SCALE_WRITERS; import static io.trino.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT; import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; import static io.trino.SystemSessionProperties.TASK_WRITER_COUNT; @@ -251,44 +251,11 @@ public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session ses }; } - @Test - public void testCreateTableAsSelectWithPreferredPartitioningThreshold() - { - assertDistributedPlan( - "CREATE TABLE new_test_table_preferred_partitioning (column1, column2) AS SELECT * FROM (VALUES (1, 2)) t(column1, column2)", - withPreferredPartitioningThreshold(), - anyTree( - node(TableWriterNode.class, - // round robin - exchange(REMOTE, REPARTITION, ImmutableList.of(), ImmutableSet.of(), - values("column1", "column2"))))); - assertDistributedPlan( - "CREATE TABLE new_test_table_preferred_partitioning (column1, column2) AS SELECT * FROM (VALUES (1, 2), (3,4)) t(column1, column2)", - withPreferredPartitioningThreshold(), - anyTree( - node(TableWriterNode.class, - anyTree( - exchange(LOCAL, REPARTITION, ImmutableList.of(), ImmutableSet.of("column1"), - exchange(REMOTE, REPARTITION, ImmutableList.of(), ImmutableSet.of("column1"), - anyTree(values("column1", "column2")))))))); - } - private Session withForcedPreferredPartitioning() { return Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") - .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") - .setSystemProperty(TASK_PARTITIONED_WRITER_COUNT, "16") - .setSystemProperty(TASK_WRITER_COUNT, "16") - .build(); - } - - private Session withPreferredPartitioningThreshold() - { - return Session.builder(getQueryRunner().getDefaultSession()) - .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "2") + .setSystemProperty(SCALE_WRITERS, "false") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") .setSystemProperty(TASK_PARTITIONED_WRITER_COUNT, "16") .setSystemProperty(TASK_WRITER_COUNT, "16") diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestApplyPreferredTableWriterPartitioning.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestApplyPreferredTableWriterPartitioning.java deleted file mode 100644 index 82146a0c1817..000000000000 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestApplyPreferredTableWriterPartitioning.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.sql.planner.iterative.rule; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.trino.Session; -import io.trino.cost.PlanNodeStatsEstimate; -import io.trino.cost.SymbolStatsEstimate; -import io.trino.sql.planner.Partitioning; -import io.trino.sql.planner.PartitioningScheme; -import io.trino.sql.planner.Symbol; -import io.trino.sql.planner.assertions.PlanMatchPattern; -import io.trino.sql.planner.iterative.rule.test.RuleAssert; -import io.trino.sql.planner.iterative.rule.test.RuleTester; -import io.trino.sql.planner.plan.PlanNodeId; -import io.trino.sql.planner.plan.ValuesNode; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import java.util.Optional; - -import static io.airlift.testing.Closeables.closeAllRuntimeException; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; -import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING; -import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; -import static io.trino.sql.planner.assertions.PlanMatchPattern.tableWriter; -import static io.trino.sql.planner.assertions.PlanMatchPattern.values; -import static io.trino.sql.planner.iterative.rule.test.RuleTester.defaultRuleTester; -import static io.trino.testing.TestingSession.testSessionBuilder; -import static java.lang.Double.NaN; - -public class TestApplyPreferredTableWriterPartitioning -{ - private static final String MOCK_CATALOG = "mock_catalog"; - private static final String TEST_SCHEMA = "test_schema"; - private static final String NODE_ID = "mock"; - private static final double NO_STATS = -1; - - private static final Session SESSION_WITHOUT_PREFERRED_PARTITIONING = testSessionBuilder() - .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "false") - .setCatalog(MOCK_CATALOG) - .setSchema(TEST_SCHEMA) - .build(); - private static final Session SESSION_WITH_PREFERRED_PARTITIONING_THRESHOLD_0 = testSessionBuilder() - .setCatalog(MOCK_CATALOG) - .setSchema(TEST_SCHEMA) - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") - .build(); - private static final Session SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD = testSessionBuilder() - .setCatalog(MOCK_CATALOG) - .setSchema(TEST_SCHEMA) - .build(); - - private static final PlanMatchPattern SUCCESSFUL_MATCH = tableWriter( - ImmutableList.of(), - ImmutableList.of(), - values(0)); - - private RuleTester tester; - - @BeforeClass - public void setUp() - { - tester = defaultRuleTester(); - } - - @AfterClass(alwaysRun = true) - public void tearDown() - { - closeAllRuntimeException(tester); - tester = null; - } - - @Test(dataProvider = "preferWritePartitioningDataProvider") - public void testPreferWritePartitioning(Session session, double distinctValuesStat, boolean match) - { - RuleAssert ruleAssert = assertPreferredPartitioning( - new PartitioningScheme( - Partitioning.create(FIXED_HASH_DISTRIBUTION, ImmutableList.of(new Symbol("col_one"))), - ImmutableList.of(new Symbol("col_one")))) - .withSession(session); - if (distinctValuesStat != NO_STATS) { - ruleAssert = ruleAssert.overrideStats(NODE_ID, PlanNodeStatsEstimate.builder() - .addSymbolStatistics(ImmutableMap.of( - new Symbol("col_one"), - new SymbolStatsEstimate(0, 0, 0, 0, distinctValuesStat))) - .build()); - } - if (match) { - ruleAssert.matches(SUCCESSFUL_MATCH); - } - else { - ruleAssert.doesNotFire(); - } - } - - @DataProvider(name = "preferWritePartitioningDataProvider") - public Object[][] preferWritePartitioningDataProvider() - { - return new Object[][] { - new Object[] {SESSION_WITHOUT_PREFERRED_PARTITIONING, NO_STATS, false}, - new Object[] {SESSION_WITHOUT_PREFERRED_PARTITIONING, NaN, false}, - new Object[] {SESSION_WITHOUT_PREFERRED_PARTITIONING, 1, false}, - new Object[] {SESSION_WITHOUT_PREFERRED_PARTITIONING, 50, false}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_THRESHOLD_0, NO_STATS, true}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_THRESHOLD_0, NaN, true}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_THRESHOLD_0, 1, true}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_THRESHOLD_0, 49, true}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_THRESHOLD_0, 50, true}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD, NO_STATS, false}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD, NaN, false}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD, 1, false}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD, 49, false}, - new Object[] {SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD, 50, true}, - }; - } - - @Test - public void testThresholdWithNullFraction() - { - // Null value in partition column should increase the number of partitions by 1 - PlanNodeStatsEstimate stats = PlanNodeStatsEstimate.builder() - .addSymbolStatistics(ImmutableMap.of(new Symbol("col_one"), new SymbolStatsEstimate(0, 0, .5, 0, 49))) - .build(); - - assertPreferredPartitioning(new PartitioningScheme( - Partitioning.create(FIXED_HASH_DISTRIBUTION, ImmutableList.of(new Symbol("col_one"))), - ImmutableList.of(new Symbol("col_one")))) - .withSession(SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD) - .overrideStats(NODE_ID, stats) - .matches(SUCCESSFUL_MATCH); - } - - @Test - public void testThresholdWithMultiplePartitions() - { - PlanNodeStatsEstimate stats = PlanNodeStatsEstimate.builder() - .addSymbolStatistics(ImmutableMap.of(new Symbol("col_one"), new SymbolStatsEstimate(0, 0, 0, 0, 5))) - .addSymbolStatistics(ImmutableMap.of(new Symbol("col_two"), new SymbolStatsEstimate(0, 0, 0, 0, 10))) - .build(); - - assertPreferredPartitioning(new PartitioningScheme( - Partitioning.create(FIXED_HASH_DISTRIBUTION, ImmutableList.of(new Symbol("col_one"), new Symbol("col_two"))), - ImmutableList.of(new Symbol("col_one"), new Symbol("col_two")))) - .withSession(SESSION_WITH_PREFERRED_PARTITIONING_DEFAULT_THRESHOLD) - .overrideStats(NODE_ID, stats) - .matches(SUCCESSFUL_MATCH); - } - - private RuleAssert assertPreferredPartitioning(PartitioningScheme preferredPartitioningScheme) - { - return tester.assertThat(new ApplyPreferredTableWriterPartitioning()) - .on(builder -> builder.tableWriter( - ImmutableList.of(), - ImmutableList.of(), - Optional.empty(), - Optional.of(preferredPartitioningScheme), - Optional.empty(), - Optional.empty(), - new ValuesNode(new PlanNodeId(NODE_ID), 0))); - } -} diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableExecuteSourceColumns.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableExecuteSourceColumns.java index cb10686aaf43..ba4b19d0b0a9 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableExecuteSourceColumns.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableExecuteSourceColumns.java @@ -80,7 +80,6 @@ public void testDoNotPrunePartitioningSchemeSymbols() ImmutableList.of(partition, hash), ImmutableList.of(partition), hash)), - Optional.empty(), p.values(a, partition, hash)); }) .doesNotFire(); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableWriterSourceColumns.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableWriterSourceColumns.java index 5946d76abe13..83582c3200a9 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableWriterSourceColumns.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPruneTableWriterSourceColumns.java @@ -85,7 +85,6 @@ public void testDoNotPrunePartitioningSchemeSymbols() hash)), Optional.empty(), Optional.empty(), - Optional.empty(), p.values(a, partition, hash)); }) .doesNotFire(); @@ -104,7 +103,6 @@ public void testDoNotPruneStatisticAggregationSymbols() ImmutableList.of(a), ImmutableList.of("column_a"), Optional.empty(), - Optional.empty(), Optional.of( p.statisticAggregations( ImmutableMap.of(aggregation, p.aggregation(PlanBuilder.expression("avg(argument)"), ImmutableList.of(BIGINT))), diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java index ededc71dbbac..6e0f03151ca3 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/test/PlanBuilder.java @@ -703,7 +703,6 @@ public TableFinishNode tableWithExchangeCreate(WriterTarget target, PlanNode sou ImmutableList.of(rowCountSymbol), ImmutableList.of("column_a"), Optional.empty(), - Optional.empty(), target, source, rowCountSymbol)) @@ -1139,14 +1138,13 @@ public ExceptNode except(ListMultimap outputsToInputs, List columns, List columnNames, PlanNode source) { - return tableWriter(columns, columnNames, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), source); + return tableWriter(columns, columnNames, Optional.empty(), Optional.empty(), Optional.empty(), source); } public TableWriterNode tableWriter( List columns, List columnNames, Optional partitioningScheme, - Optional preferredPartitioningScheme, TableWriterNode.WriterTarget target, PlanNode source, Symbol rowCountSymbol) @@ -1160,7 +1158,6 @@ public TableWriterNode tableWriter( columns, columnNames, partitioningScheme, - preferredPartitioningScheme, Optional.empty(), Optional.empty()); } @@ -1169,7 +1166,6 @@ public TableWriterNode tableWriter( List columns, List columnNames, Optional partitioningScheme, - Optional preferredPartitioningScheme, Optional statisticAggregations, Optional> statisticAggregationsDescriptor, PlanNode source) @@ -1183,21 +1179,19 @@ public TableWriterNode tableWriter( columns, columnNames, partitioningScheme, - preferredPartitioningScheme, statisticAggregations, statisticAggregationsDescriptor); } public TableExecuteNode tableExecute(List columns, List columnNames, PlanNode source) { - return tableExecute(columns, columnNames, Optional.empty(), Optional.empty(), source); + return tableExecute(columns, columnNames, Optional.empty(), source); } public TableExecuteNode tableExecute( List columns, List columnNames, Optional partitioningScheme, - Optional preferredPartitioningScheme, PlanNode source) { return new TableExecuteNode( @@ -1215,8 +1209,7 @@ public TableExecuteNode tableExecute( symbol("fragment", VARBINARY), columns, columnNames, - partitioningScheme, - preferredPartitioningScheme); + partitioningScheme); } public TableFunctionNode tableFunction( diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForPartitionedInsertAndMerge.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForPartitionedInsertAndMerge.java index b588b009e4b2..4544b8fcc1af 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForPartitionedInsertAndMerge.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForPartitionedInsertAndMerge.java @@ -34,7 +34,6 @@ import java.util.Optional; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.SCALE_WRITERS; import static io.trino.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT; import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; @@ -78,7 +77,6 @@ protected LocalQueryRunner createLocalQueryRunner() .setSchema("mock") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") .setSystemProperty(SCALE_WRITERS, "false") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") .build(); LocalQueryRunner queryRunner = LocalQueryRunner.create(session); queryRunner.createCatalog("mock_merge_and_insert", createMergeConnectorFactory(), ImmutableMap.of()); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java index 7ae677c86977..8433806e43c0 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddLocalExchangesForTaskScaleWriters.java @@ -34,9 +34,9 @@ import java.util.Optional; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.SCALE_WRITERS; import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; +import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING; import static io.trino.spi.statistics.TableStatistics.empty; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; @@ -220,7 +220,7 @@ public void testLocalScaledPartitionedWriterWithoutSupportsForReportingWrittenBy testSessionBuilder() .setCatalog(catalogName) .setSchema("mock") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled)) .setSystemProperty(SCALE_WRITERS, "false") .build(), @@ -239,7 +239,7 @@ public void testLocalScaledPartitionedWriterWithoutSupportsForReportingWrittenBy testSessionBuilder() .setCatalog(catalogName) .setSchema("mock") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled)) .setSystemProperty(SCALE_WRITERS, "false") .build(), @@ -273,9 +273,11 @@ public void testLocalScaledPartitionedWriterWithoutSupportForReportingWrittenByt tableWriter( ImmutableList.of("customer", "year"), ImmutableList.of("customer", "year"), - exchange(LOCAL, GATHER, SINGLE_DISTRIBUTION, - exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, - tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + project( + exchange(LOCAL, REPARTITION, FIXED_HASH_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, + project( + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))))); assertDistributedPlan( "INSERT INTO connector_partitioned_table SELECT * FROM source_table", @@ -309,7 +311,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre .setCatalog("mock_report_written_bytes_with_multiple_writer_per_partition") .setSchema("mock") // Enforce preferred partitioning - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true") .setSystemProperty(SCALE_WRITERS, "false") .build(), @@ -329,7 +331,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre .setCatalog("mock_report_written_bytes_with_multiple_writer_per_partition") .setSchema("mock") // Enforce preferred partitioning - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") .setSystemProperty(SCALE_WRITERS, "false") .build(), @@ -408,7 +410,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni ImmutableList.of("customer", "year"), project( exchange(LOCAL, REPARTITION, SCALED_WRITER_HASH_DISTRIBUTION, - exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, project( tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))))); @@ -424,8 +426,10 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni tableWriter( ImmutableList.of("customer", "year"), ImmutableList.of("customer", "year"), - exchange(LOCAL, GATHER, SINGLE_DISTRIBUTION, - exchange(REMOTE, REPARTITION, FIXED_ARBITRARY_DISTRIBUTION, - tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))); + project( + exchange(LOCAL, REPARTITION, FIXED_HASH_DISTRIBUTION, + exchange(REMOTE, REPARTITION, FIXED_HASH_DISTRIBUTION, + project( + tableScan("source_table", ImmutableMap.of("customer", "customer", "year", "year"))))))))); } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java index 90f301f3e8c6..1cd9ee45903f 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestLimitMaxWriterNodesCount.java @@ -41,10 +41,10 @@ import java.util.OptionalInt; import static io.trino.SystemSessionProperties.MAX_WRITER_TASKS_COUNT; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.REDISTRIBUTE_WRITES; import static io.trino.SystemSessionProperties.RETRY_POLICY; import static io.trino.SystemSessionProperties.SCALE_WRITERS; +import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING; import static io.trino.spi.connector.TableProcedureExecutionMode.distributedWithFilteringAndRepartitioning; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; @@ -210,7 +210,7 @@ public void testPlanWhenInsertToPartitionedTablePreferredPartitioningEnabled() Session session = Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(MAX_WRITER_TASKS_COUNT, "2") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setCatalog(catalogName) .build(); @@ -255,7 +255,7 @@ public void testPlanWhenMaxWriterTasksSpecified() Session session = Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(MAX_WRITER_TASKS_COUNT, "2") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setCatalog(catalogNameWithMaxWriterTasksSpecified) .build(); @@ -279,7 +279,7 @@ public void testPlanWhenRetryPolicyIsTask() Session session = Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(MAX_WRITER_TASKS_COUNT, "2") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setSystemProperty(RETRY_POLICY, "TASK") .setCatalog(catalogNameWithMaxWriterTasksSpecified) .build(); @@ -368,7 +368,7 @@ public void testPlanWhenTableExecuteToPartitionedTablePreferredPartitioningEnabl Session session = Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(MAX_WRITER_TASKS_COUNT, "2") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setCatalog(catalogName) .build(); @@ -392,7 +392,7 @@ public void testPlanTableExecuteWhenMaxWriterTasksSpecified() Session session = Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(MAX_WRITER_TASKS_COUNT, "2") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setCatalog(catalogNameWithMaxWriterTasksSpecified) .build(); @@ -416,7 +416,7 @@ public void testPlanTableExecuteWhenRetryPolicyIsTask() Session session = Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(MAX_WRITER_TASKS_COUNT, "2") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") .setSystemProperty(RETRY_POLICY, "TASK") .setCatalog(catalogNameWithMaxWriterTasksSpecified) .build(); diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/planprinter/TestJsonRepresentation.java b/core/trino-main/src/test/java/io/trino/sql/planner/planprinter/TestJsonRepresentation.java index 74c42ca9dd51..8515ecec3301 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/planprinter/TestJsonRepresentation.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/planprinter/TestJsonRepresentation.java @@ -101,7 +101,7 @@ public void testDistributedJsonPlan() ImmutableList.of(), ImmutableList.of(new PlanNodeStatsAndCostSummary(10, 90, 90, 0, 0)), ImmutableList.of(new JsonRenderedNode( - "149", + "147", "LocalExchange", ImmutableMap.of( "partitioning", "SINGLE", @@ -144,7 +144,7 @@ public void testLogicalJsonPlan() ImmutableList.of(), ImmutableList.of(new PlanNodeStatsAndCostSummary(10, 90, 90, 0, 0)), ImmutableList.of(new JsonRenderedNode( - "149", + "147", "LocalExchange", ImmutableMap.of( "partitioning", "SINGLE", diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java index 1bd4c6f225c6..2958c1e363aa 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/sanity/TestValidateScaledWritersUsage.java @@ -239,7 +239,6 @@ public void testScaledWritersTwoTableWritersNodes(PartitioningHandle scaledWrite ImmutableList.of(symbol), ImmutableList.of("column_a"), Optional.empty(), - Optional.empty(), planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true, true), planBuilder.exchange(innerExchange -> innerExchange diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 9a47a1d82b08..f05c0509a44d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -1737,7 +1737,6 @@ public void testOptimizeWithEnforcedRepartitioning() .setCatalog(getQueryRunner().getDefaultSession().getCatalog()) .setSchema(getQueryRunner().getDefaultSession().getSchema()) .setSystemProperty("use_preferred_write_partitioning", "true") - .setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "1") .build(); String tableName = "test_optimize_partitioned_table_" + randomNameSuffix(); String tableLocation = getLocationForTable(bucketName, tableName); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePreferredPartitioning.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePreferredPartitioning.java index f3bcc8d092e7..ad3b7d3f2201 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePreferredPartitioning.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePreferredPartitioning.java @@ -22,7 +22,6 @@ import org.testng.annotations.Test; import static com.google.common.base.Verify.verify; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT; import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; @@ -135,7 +134,6 @@ private Session withForcedPreferredPartitioning() { return Session.builder(getQueryRunner().getDefaultSession()) .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "true") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") // It is important to explicitly set partitioned writer count to 1 since in above tests we are testing // the open writers limit for partitions. So, with default value of 32 writer count, we will never // hit that limit thus, tests will fail. diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 869a782b8069..7b546d2d3f32 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -112,7 +112,6 @@ import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE; import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE; import static io.trino.SystemSessionProperties.MAX_WRITER_TASKS_COUNT; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.REDISTRIBUTE_WRITES; import static io.trino.SystemSessionProperties.SCALE_WRITERS; import static io.trino.SystemSessionProperties.TASK_SCALE_WRITERS_ENABLED; @@ -4112,30 +4111,30 @@ public Object[][] prepareScaledWritersOption() @Test(dataProvider = "taskWritersLimitParams") public void testWriterTasksCountLimitUnpartitioned(boolean scaleWriters, boolean redistributeWrites, int expectedFilesCount) { - testLimitWriterTasks(2, expectedFilesCount, scaleWriters, redistributeWrites, false); + testLimitWriterTasks(2, expectedFilesCount, scaleWriters, redistributeWrites, false, DataSize.of(1, MEGABYTE)); } @Test public void testWriterTasksCountLimitPartitionedScaleWritersDisabled() { - testLimitWriterTasks(2, 2, false, true, true); + testLimitWriterTasks(2, 2, false, true, true, DataSize.of(1, MEGABYTE)); } @Test public void testWriterTasksCountLimitPartitionedScaleWritersEnabled() { - testLimitWriterTasks(2, 2, true, true, true); + testLimitWriterTasks(2, 4, true, true, true, DataSize.of(1, MEGABYTE)); + testLimitWriterTasks(2, 2, true, true, true, DataSize.of(32, MEGABYTE)); } - private void testLimitWriterTasks(int maxWriterTasks, int expectedFilesCount, boolean scaleWritersEnabled, boolean redistributeWrites, boolean partitioned) + private void testLimitWriterTasks(int maxWriterTasks, int expectedFilesCount, boolean scaleWritersEnabled, boolean redistributeWrites, boolean partitioned, DataSize writerMinSize) { Session session = Session.builder(getSession()) .setSystemProperty(SCALE_WRITERS, Boolean.toString(scaleWritersEnabled)) .setSystemProperty(MAX_WRITER_TASKS_COUNT, Integer.toString(maxWriterTasks)) .setSystemProperty(REDISTRIBUTE_WRITES, Boolean.toString(redistributeWrites)) .setSystemProperty(TASK_WRITER_COUNT, "1") - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") - .setSystemProperty(WRITER_MIN_SIZE, "1MB") + .setSystemProperty(WRITER_MIN_SIZE, writerMinSize.toString()) .setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false") .build(); String tableName = "writing_tasks_limit_%s".formatted(randomNameSuffix()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 615722e30de2..e93cd9d1a523 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -101,10 +101,10 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.MoreCollectors.onlyElement; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.SCALE_WRITERS; import static io.trino.SystemSessionProperties.TASK_PARTITIONED_WRITER_COUNT; import static io.trino.SystemSessionProperties.TASK_WRITER_COUNT; +import static io.trino.SystemSessionProperties.USE_PREFERRED_WRITE_PARTITIONING; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveTestUtils.SESSION; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; @@ -4545,16 +4545,10 @@ public void testRepartitionDataOnInsert(Session session, String partitioning, in public Object[][] repartitioningDataProvider() { Session defaultSession = getSession(); - // For identity-only partitioning, Iceberg connector returns ConnectorTableLayout with partitionColumns set, but without partitioning. - // This is treated by engine as "preferred", but not mandatory partitioning, and gets ignored if stats suggest number of partitions - // written is low. Without partitioning, number of files created is nondeterministic, as a writer (worker node) may or may not receive data. - Session obeyConnectorPartitioning = Session.builder(defaultSession) - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "1") - .build(); return new Object[][] { // identity partitioning column - {obeyConnectorPartitioning, "'orderstatus'", 3}, + {defaultSession, "'orderstatus'", 3}, // bucketing {defaultSession, "'bucket(custkey, 13)'", 13}, // varchar-based @@ -4580,17 +4574,14 @@ public void testStatsBasedRepartitionDataOnInsert() private void testStatsBasedRepartitionData(boolean ctas) { - Session sessionRepartitionSmall = Session.builder(getSession()) - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "2") - .build(); Session sessionRepartitionMany = Session.builder(getSession()) - .setSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, "5") .setSystemProperty(SCALE_WRITERS, "false") + .setSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, "false") .build(); // Use DISTINCT to add data redistribution between source table and the writer. This makes it more likely that all writers get some data. String sourceRelation = "(SELECT DISTINCT orderkey, custkey, orderstatus FROM tpch.tiny.orders)"; testRepartitionData( - sessionRepartitionSmall, + getSession(), sourceRelation, ctas, "'orderstatus'", @@ -5051,7 +5042,6 @@ public void testOptimizeForPartitionedTable(int formatVersion) .setCatalog(getQueryRunner().getDefaultSession().getCatalog()) .setSchema(getQueryRunner().getDefaultSession().getSchema()) .setSystemProperty("use_preferred_write_partitioning", "true") - .setSystemProperty("preferred_write_partitioning_min_number_of_partitions", "100") .build(); String tableName = "test_repartitiong_during_optimize_" + randomNameSuffix(); assertUpdate(session, "CREATE TABLE " + tableName + " (key varchar, value integer) WITH (format_version = " + formatVersion + ", partitioning = ARRAY['key'])"); diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java index 886e49d9ec53..7f659e40a094 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java @@ -1173,7 +1173,7 @@ public void testAnonymizedJsonPlan() ImmutableList.of(), ImmutableList.of(), ImmutableList.of(new JsonRenderedNode( - "173", + "171", "LocalExchange", ImmutableMap.of( "partitioning", "[connectorHandleType = SystemPartitioningHandle, partitioning = SINGLE, function = SINGLE]", @@ -1184,7 +1184,7 @@ public void testAnonymizedJsonPlan() ImmutableList.of(), ImmutableList.of(), ImmutableList.of(new JsonRenderedNode( - "140", + "138", "RemoteSource", ImmutableMap.of("sourceFragmentIds", "[1]"), ImmutableList.of(typedSymbol("symbol_1", "double")), @@ -1192,7 +1192,7 @@ public void testAnonymizedJsonPlan() ImmutableList.of(), ImmutableList.of()))))))), "1", new JsonRenderedNode( - "139", + "137", "LimitPartial", ImmutableMap.of( "count", "10",