Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_STAGE_COUNT = "query_max_stage_count";
public static final String REDISTRIBUTE_WRITES = "redistribute_writes";
public static final String USE_PREFERRED_WRITE_PARTITIONING = "use_preferred_write_partitioning";
public static final String PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS = "preferred_write_partitioning_min_number_of_partitions";
public static final String SCALE_WRITERS = "scale_writers";
public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled";
public static final String MAX_WRITER_TASKS_COUNT = "max_writer_tasks_count";
Expand Down Expand Up @@ -306,16 +305,6 @@ public SystemSessionProperties(
"Use preferred write partitioning",
optimizerConfig.isUsePreferredWritePartitioning(),
false),
integerProperty(
PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS,
"Use preferred write partitioning when the number of written partitions exceeds the configured threshold",
optimizerConfig.getPreferredWritePartitioningMinNumberOfPartitions(),
value -> {
if (value < 1) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be greater than or equal to 1: %s", PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, value));
}
},
false),
booleanProperty(
SCALE_WRITERS,
"Scale out writers based on throughput (use minimum necessary)",
Expand Down Expand Up @@ -1093,11 +1082,6 @@ public static boolean isUsePreferredWritePartitioning(Session session)
return session.getSystemProperty(USE_PREFERRED_WRITE_PARTITIONING, Boolean.class);
}

public static int getPreferredWritePartitioningMinNumberOfPartitions(Session session)
{
return session.getSystemProperty(PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS, Integer.class);
}

public static boolean isScaleWriters(Session session)
{
return session.getSystemProperty(SCALE_WRITERS, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class PagePartitioner
private final PageSerializer serializer;
private final PositionsAppenderPageBuilder[] positionsAppenders;
private final boolean replicatesAnyRow;
private final boolean partitionProcessRleAndDictionaryBlocks;
private final int nullChannel; // when >= 0, send the position to every partition if this channel is null
private PartitionedOutputInfoSupplier partitionedOutputInfoSupplier;

Expand All @@ -86,7 +87,8 @@ public PagePartitioner(
DataSize maxMemory,
PositionsAppenderFactory positionsAppenderFactory,
Optional<Slice> exchangeEncryptionKey,
AggregatedMemoryContext aggregatedMemoryContext)
AggregatedMemoryContext aggregatedMemoryContext,
boolean partitionProcessRleAndDictionaryBlocks)
{
this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null");
this.partitionChannels = Ints.toArray(requireNonNull(partitionChannels, "partitionChannels is null"));
Expand All @@ -104,6 +106,7 @@ public PagePartitioner(
this.nullChannel = nullChannel.orElse(-1);
this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null");
this.serializer = serdeFactory.createSerializer(exchangeEncryptionKey.map(Ciphers::deserializeAesEncryptionKey));
this.partitionProcessRleAndDictionaryBlocks = partitionProcessRleAndDictionaryBlocks;

// Ensure partition channels align with constant arguments provided
for (int i = 0; i < this.partitionChannels.length; i++) {
Expand All @@ -125,6 +128,11 @@ public PagePartitioner(
updateMemoryUsage();
}

public PartitionFunction getPartitionFunction()
{
return partitionFunction;
}

// sets up this partitioner for the new operator
public void setupOperator(OperatorContext operatorContext)
{
Expand Down Expand Up @@ -232,12 +240,12 @@ private IntArrayList[] partitionPositions(Page page)

Page partitionFunctionArgs = getPartitionFunctionArguments(page);

if (partitionFunctionArgs.getChannelCount() > 0 && onlyRleBlocks(partitionFunctionArgs)) {
if (partitionProcessRleAndDictionaryBlocks && partitionFunctionArgs.getChannelCount() > 0 && onlyRleBlocks(partitionFunctionArgs)) {
// we need at least one Rle block since with no blocks partition function
// can return a different value per invocation (e.g. RoundRobinBucketFunction)
partitionBySingleRleValue(page, position, partitionFunctionArgs, partitionPositions);
}
else if (partitionFunctionArgs.getChannelCount() == 1 && isDictionaryProcessingFaster(partitionFunctionArgs.getBlock(0))) {
else if (partitionProcessRleAndDictionaryBlocks && partitionFunctionArgs.getChannelCount() == 1 && isDictionaryProcessingFaster(partitionFunctionArgs.getBlock(0))) {
partitionBySingleDictionary(page, position, partitionFunctionArgs, partitionPositions);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static class PartitionedOutputFactory
private final Optional<Slice> exchangeEncryptionKey;
private final AggregatedMemoryContext memoryContext;
private final int pagePartitionerPoolSize;
private final Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer;

public PartitionedOutputFactory(
PartitionFunction partitionFunction,
Expand All @@ -72,7 +73,8 @@ public PartitionedOutputFactory(
PositionsAppenderFactory positionsAppenderFactory,
Optional<Slice> exchangeEncryptionKey,
AggregatedMemoryContext memoryContext,
int pagePartitionerPoolSize)
int pagePartitionerPoolSize,
Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer)
{
this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null");
this.partitionChannels = requireNonNull(partitionChannels, "partitionChannels is null");
Expand All @@ -85,6 +87,7 @@ public PartitionedOutputFactory(
this.exchangeEncryptionKey = requireNonNull(exchangeEncryptionKey, "exchangeEncryptionKey is null");
this.memoryContext = requireNonNull(memoryContext, "memoryContext is null");
this.pagePartitionerPoolSize = pagePartitionerPoolSize;
this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null");
}

@Override
Expand All @@ -111,7 +114,8 @@ public OperatorFactory createOutputOperator(
positionsAppenderFactory,
exchangeEncryptionKey,
memoryContext,
pagePartitionerPoolSize);
pagePartitionerPoolSize,
skewedPartitionRebalancer);
}
}

Expand All @@ -134,6 +138,7 @@ public static class PartitionedOutputOperatorFactory
private final Optional<Slice> exchangeEncryptionKey;
private final AggregatedMemoryContext memoryContext;
private final int pagePartitionerPoolSize;
private final Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer;
private final PagePartitionerPool pagePartitionerPool;

public PartitionedOutputOperatorFactory(
Expand All @@ -152,7 +157,8 @@ public PartitionedOutputOperatorFactory(
PositionsAppenderFactory positionsAppenderFactory,
Optional<Slice> exchangeEncryptionKey,
AggregatedMemoryContext memoryContext,
int pagePartitionerPoolSize)
int pagePartitionerPoolSize,
Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -170,21 +176,33 @@ public PartitionedOutputOperatorFactory(
this.exchangeEncryptionKey = requireNonNull(exchangeEncryptionKey, "exchangeEncryptionKey is null");
this.memoryContext = requireNonNull(memoryContext, "memoryContext is null");
this.pagePartitionerPoolSize = pagePartitionerPoolSize;
this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null");
this.pagePartitionerPool = new PagePartitionerPool(
pagePartitionerPoolSize,
() -> new PagePartitioner(
partitionFunction,
partitionChannels,
partitionConstants,
replicatesAnyRow,
nullChannel,
outputBuffer,
serdeFactory,
sourceTypes,
maxMemory,
positionsAppenderFactory,
exchangeEncryptionKey,
memoryContext));
() -> {
boolean partitionProcessRleAndDictionaryBlocks = true;
PartitionFunction function = partitionFunction;
if (skewedPartitionRebalancer.isPresent()) {
function = new SkewedPartitionFunction(partitionFunction, skewedPartitionRebalancer.get());
// Partition flattened Rle and Dictionary blocks since if they are scaled then we want to
// round-robin the entire block to increase the writing parallelism across tasks/workers.
partitionProcessRleAndDictionaryBlocks = false;
}
return new PagePartitioner(
function,
partitionChannels,
partitionConstants,
replicatesAnyRow,
nullChannel,
outputBuffer,
serdeFactory,
sourceTypes,
maxMemory,
positionsAppenderFactory,
exchangeEncryptionKey,
memoryContext,
partitionProcessRleAndDictionaryBlocks);
});
}

@Override
Expand All @@ -195,7 +213,8 @@ public Operator createOperator(DriverContext driverContext)
operatorContext,
pagePreprocessor,
outputBuffer,
pagePartitionerPool);
pagePartitionerPool,
skewedPartitionRebalancer);
}

@Override
Expand Down Expand Up @@ -223,14 +242,16 @@ public OperatorFactory duplicate()
positionsAppenderFactory,
exchangeEncryptionKey,
memoryContext,
pagePartitionerPoolSize);
pagePartitionerPoolSize,
skewedPartitionRebalancer);
}
}

private final OperatorContext operatorContext;
private final Function<Page, Page> pagePreprocessor;
private final PagePartitionerPool pagePartitionerPool;
private final PagePartitioner partitionFunction;
private final PagePartitioner pagePartitioner;
private final Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer;
// outputBuffer is used only to block the operator from finishing if the outputBuffer is full
private final OutputBuffer outputBuffer;
private ListenableFuture<Void> isBlocked = NOT_BLOCKED;
Expand All @@ -240,14 +261,16 @@ public PartitionedOutputOperator(
OperatorContext operatorContext,
Function<Page, Page> pagePreprocessor,
OutputBuffer outputBuffer,
PagePartitionerPool pagePartitionerPool)
PagePartitionerPool pagePartitionerPool,
Optional<SkewedPartitionRebalancer> skewedPartitionRebalancer)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.pagePreprocessor = requireNonNull(pagePreprocessor, "pagePreprocessor is null");
this.pagePartitionerPool = requireNonNull(pagePartitionerPool, "pagePartitionerPool is null");
this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null");
this.partitionFunction = requireNonNull(pagePartitionerPool.poll(), "partitionFunction is null");
this.partitionFunction.setupOperator(operatorContext);
this.pagePartitioner = requireNonNull(pagePartitionerPool.poll(), "pagePartitioner is null");
this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null");
this.pagePartitioner.setupOperator(operatorContext);
}

@Override
Expand All @@ -260,7 +283,7 @@ public OperatorContext getOperatorContext()
public void finish()
{
if (!finished) {
pagePartitionerPool.release(partitionFunction);
pagePartitionerPool.release(pagePartitioner);
finished = true;
}
}
Expand Down Expand Up @@ -309,7 +332,22 @@ public void addInput(Page page)
}

page = pagePreprocessor.apply(page);
partitionFunction.partitionPage(page);
pagePartitioner.partitionPage(page);

// Rebalance skewed partitions in the case of scale writer hash partitioning
if (skewedPartitionRebalancer.isPresent()) {
SkewedPartitionRebalancer rebalancer = skewedPartitionRebalancer.get();

// Update data processed and partitionRowCount state
rebalancer.addDataProcessed(page.getSizeInBytes());
((SkewedPartitionFunction) pagePartitioner.getPartitionFunction()).flushPartitionRowCountToRebalancer();

// Rebalance only when output buffer is full. This resembles that the downstream writing stage is slow, and
// we could rebalance partitions to increase the concurrency at downstream stage.
if (!outputBuffer.isFull().isDone()) {
rebalancer.rebalance();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.trino.operator.output;

import io.trino.operator.PartitionFunction;
import io.trino.spi.Page;

import static java.util.Objects.requireNonNull;

public class SkewedPartitionFunction
implements PartitionFunction
{
private final PartitionFunction partitionFunction;
private final SkewedPartitionRebalancer skewedPartitionRebalancer;

private final long[] partitionRowCount;

public SkewedPartitionFunction(PartitionFunction partitionFunction, SkewedPartitionRebalancer skewedPartitionRebalancer)
{
this.partitionFunction = requireNonNull(partitionFunction, "partitionFunction is null");
this.skewedPartitionRebalancer = requireNonNull(skewedPartitionRebalancer, "skewedPartitionRebalancer is null");

this.partitionRowCount = new long[partitionFunction.getPartitionCount()];
}

@Override
public int getPartitionCount()
{
return skewedPartitionRebalancer.getTaskCount();
}

@Override
public int getPartition(Page page, int position)
{
int partition = partitionFunction.getPartition(page, position);
return skewedPartitionRebalancer.getTaskId(partition, partitionRowCount[partition]++);
}

public void flushPartitionRowCountToRebalancer()
{
for (int partition = 0; partition < partitionFunction.getPartitionCount(); partition++) {
skewedPartitionRebalancer.addPartitionRowCount(partition, partitionRowCount[partition]);
partitionRowCount[partition] = 0;
}
}
}
Loading