Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions core/trino-main/src/main/java/io/trino/FeaturesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.sql.analyzer.RegexLibrary.JONI;

@DefunctConfig({
Expand Down Expand Up @@ -71,7 +72,7 @@ public class FeaturesConfig

private boolean redistributeWrites = true;
private boolean scaleWriters = true;
private DataSize writerMinSize = DataSize.of(32, DataSize.Unit.MEGABYTE);
private DataSize writerScalingMinDataProcessed = DataSize.of(120, DataSize.Unit.MEGABYTE);
private DataIntegrityVerification exchangeDataIntegrityVerification = DataIntegrityVerification.ABORT;
/**
* default value is overwritten for fault tolerant execution in {@link #applyFaultTolerantExecutionDefaults()}}
Expand Down Expand Up @@ -153,16 +154,25 @@ public FeaturesConfig setScaleWriters(boolean scaleWriters)
}

@NotNull
public DataSize getWriterMinSize()
public DataSize getWriterScalingMinDataProcessed()
{
return writerMinSize;
return writerScalingMinDataProcessed;
}

@Config("writer-min-size")
@Config("writer-scaling-min-data-processed")
@ConfigDescription("Minimum amount of uncompressed output data processed by writers before writer scaling can happen")
public FeaturesConfig setWriterScalingMinDataProcessed(DataSize writerScalingMinDataProcessed)
{
this.writerScalingMinDataProcessed = writerScalingMinDataProcessed;
return this;
}

@Deprecated
@LegacyConfig(value = "writer-min-size", replacedBy = "writer-scaling-min-data-processed")
@ConfigDescription("Target minimum size of writer output when scaling writers")
public FeaturesConfig setWriterMinSize(DataSize writerMinSize)
{
this.writerMinSize = writerMinSize;
this.writerScalingMinDataProcessed = succinctBytes(writerMinSize.toBytes() * 2);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.OptionalInt;
Comment thread
gaurav8297 marked this conversation as resolved.
Outdated

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.execution.QueryManagerConfig.FAULT_TOLERANT_EXECUTION_MAX_PARTITION_COUNT_LIMIT;
import static io.trino.execution.QueryManagerConfig.MAX_TASK_RETRY_ATTEMPTS;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
Expand Down Expand Up @@ -84,7 +85,8 @@ public final class SystemSessionProperties
public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled";
public static final String MAX_WRITER_TASKS_COUNT = "max_writer_tasks_count";
public static final String TASK_SCALE_WRITERS_MAX_WRITER_COUNT = "task_scale_writers_max_writer_count";
public static final String WRITER_MIN_SIZE = "writer_min_size";
public static final String WRITER_SCALING_MIN_DATA_PROCESSED = "writer_scaling_min_data_processed";
public static final String SKEWED_PARTITION_MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = "skewed_partition_min_data_processed_rebalance_threshold";
public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union";
public static final String EXECUTION_POLICY = "execution_policy";
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
Expand Down Expand Up @@ -334,10 +336,15 @@ public SystemSessionProperties(
taskManagerConfig.getScaleWritersMaxWriterCount(),
true),
dataSizeProperty(
WRITER_MIN_SIZE,
"Target minimum size of writer output when scaling writers",
featuresConfig.getWriterMinSize(),
WRITER_SCALING_MIN_DATA_PROCESSED,
"Minimum amount of uncompressed output data processed by writers before writer scaling can happen",
featuresConfig.getWriterScalingMinDataProcessed(),
false),
dataSizeProperty(
SKEWED_PARTITION_MIN_DATA_PROCESSED_REBALANCE_THRESHOLD,
"Minimum data processed to trigger skewed partition rebalancing in local and remote exchange",
DataSize.of(50, MEGABYTE),
true),
booleanProperty(
PUSH_TABLE_WRITE_THROUGH_UNION,
"Parallelize writes when using UNION ALL in queries that write data",
Expand Down Expand Up @@ -1149,9 +1156,14 @@ public static int getMaxWriterTaskCount(Session session)
return session.getSystemProperty(MAX_WRITER_TASKS_COUNT, Integer.class);
}

public static DataSize getWriterMinSize(Session session)
public static DataSize getWriterScalingMinDataProcessed(Session session)
{
return session.getSystemProperty(WRITER_SCALING_MIN_DATA_PROCESSED, DataSize.class);
}

public static DataSize getSkewedPartitionMinDataProcessedRebalanceThreshold(Session session)
{
return session.getSystemProperty(WRITER_MIN_SIZE, DataSize.class);
return session.getSystemProperty(SKEWED_PARTITION_MIN_DATA_PROCESSED_REBALANCE_THRESHOLD, DataSize.class);
}

public static boolean isPushTableWriteThroughUnion(Session session)
Expand Down
4 changes: 4 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ private TaskStatus createTaskStatus(TaskHolder taskHolder)
int runningPartitionedDrivers = 0;
long runningPartitionedSplitsWeight = 0L;
DataSize outputDataSize = DataSize.ofBytes(0);
DataSize writerInputDataSize = DataSize.ofBytes(0);
DataSize physicalWrittenDataSize = DataSize.ofBytes(0);
Optional<Integer> writerCount = Optional.empty();
DataSize userMemoryReservation = DataSize.ofBytes(0);
Expand All @@ -337,6 +338,7 @@ private TaskStatus createTaskStatus(TaskHolder taskHolder)
queuedPartitionedSplitsWeight = taskStats.getQueuedPartitionedSplitsWeight();
runningPartitionedDrivers = taskStats.getRunningPartitionedDrivers();
runningPartitionedSplitsWeight = taskStats.getRunningPartitionedSplitsWeight();
writerInputDataSize = taskStats.getWriterInputDataSize();
physicalWrittenDataSize = taskStats.getPhysicalWrittenDataSize();
writerCount = taskStats.getMaxWriterCount();
userMemoryReservation = taskStats.getUserMemoryReservation();
Expand All @@ -358,6 +360,7 @@ else if (taskHolder.getTaskExecution() != null) {
runningPartitionedSplitsWeight += pipelineStatus.getRunningPartitionedSplitsWeight();
physicalWrittenBytes += pipelineContext.getPhysicalWrittenDataSize();
}
writerInputDataSize = succinctBytes(taskContext.getWriterInputDataSize());
physicalWrittenDataSize = succinctBytes(physicalWrittenBytes);
writerCount = taskContext.getMaxWriterCount();
userMemoryReservation = taskContext.getMemoryReservation();
Expand All @@ -382,6 +385,7 @@ else if (taskHolder.getTaskExecution() != null) {
runningPartitionedDrivers,
outputBuffer.getStatus(),
outputDataSize,
writerInputDataSize,
physicalWrittenDataSize,
writerCount,
userMemoryReservation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ public class TaskManagerConfig
private Duration interruptStuckSplitTasksDetectionInterval = new Duration(2, TimeUnit.MINUTES);
Comment thread
gaurav8297 marked this conversation as resolved.
Outdated
Comment thread
gaurav8297 marked this conversation as resolved.
Outdated

private boolean scaleWritersEnabled = true;
// Set the value of default max writer count to the number of processors and cap it to 32. We can do this
// Set the value of default max writer count to 2 * max(the number of processors, 32). We can do this
// because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never
// use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high
// resource utilization.
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32);
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2;
private int writerCount = 1;
// Default value of partitioned task writer count should be above 1, otherwise it can create a plan
// with a single gather exchange node on the coordinator due to a single available processor. Whereas,
// on the worker nodes due to more available processors, the default value could be above 1. Therefore,
Comment thread
gaurav8297 marked this conversation as resolved.
Outdated
Comment thread
gaurav8297 marked this conversation as resolved.
Outdated
// it can cause error due to config mismatch during execution. Additionally, cap it to 32 in order to
// avoid small pages produced by local partitioning exchanges.
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2;
// Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather
// exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to
// more available processors, the default value could be above 1. Therefore, it can cause error due to config
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class TaskStatus
private final int runningPartitionedDrivers;
private final long runningPartitionedSplitsWeight;
private final OutputBufferStatus outputBufferStatus;
private final DataSize writerInputDataSize;
private final DataSize outputDataSize;
private final DataSize physicalWrittenDataSize;
private final Optional<Integer> maxWriterCount;
Expand Down Expand Up @@ -87,6 +88,7 @@ public TaskStatus(
@JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers,
@JsonProperty("outputBufferStatus") OutputBufferStatus outputBufferStatus,
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("writerInputDataSize") DataSize writerInputDataSize,
@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,
@JsonProperty("writerCount") Optional<Integer> maxWriterCount,
@JsonProperty("memoryReservation") DataSize memoryReservation,
Expand Down Expand Up @@ -121,6 +123,7 @@ public TaskStatus(
this.outputBufferStatus = requireNonNull(outputBufferStatus, "outputBufferStatus is null");
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");

this.writerInputDataSize = requireNonNull(writerInputDataSize, "writerInputDataSize is null");
this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");
this.maxWriterCount = requireNonNull(maxWriterCount, "maxWriterCount is null");

Expand Down Expand Up @@ -196,6 +199,12 @@ public int getRunningPartitionedDrivers()
return runningPartitionedDrivers;
}

@JsonProperty
public DataSize getWriterInputDataSize()
{
return writerInputDataSize;
}

@JsonProperty
public DataSize getPhysicalWrittenDataSize()
{
Expand Down Expand Up @@ -293,6 +302,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n
OutputBufferStatus.initial(),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
Optional.empty(),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
Expand All @@ -319,6 +329,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List<E
taskStatus.getRunningPartitionedDrivers(),
taskStatus.getOutputBufferStatus(),
taskStatus.getOutputDataSize(),
taskStatus.getWriterInputDataSize(),
taskStatus.getPhysicalWrittenDataSize(),
taskStatus.getMaxWriterCount(),
taskStatus.getMemoryReservation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
import static io.trino.SystemSessionProperties.getRetryInitialDelay;
import static io.trino.SystemSessionProperties.getRetryMaxDelay;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.getWriterMinSize;
import static io.trino.SystemSessionProperties.getWriterScalingMinDataProcessed;
import static io.trino.execution.QueryState.STARTING;
import static io.trino.execution.scheduler.PipelinedStageExecution.createPipelinedStageExecution;
import static io.trino.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler;
Expand Down Expand Up @@ -1107,7 +1107,7 @@ public void stateChanged(QueryState newState)
writerTasksProvider,
nodeScheduler.createNodeSelector(session, Optional.empty()),
executor,
getWriterMinSize(session),
getWriterScalingMinDataProcessed(session),
partitionCount.get());

whenAllStages(childStageExecutions, StageExecution.State::isDone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ScaledWriterScheduler
private final Supplier<Collection<TaskStatus>> writerTasksProvider;
private final NodeSelector nodeSelector;
private final ScheduledExecutorService executor;
private final long writerMinSizeBytes;
private final long writerScalingMinDataProcessed;
private final Set<InternalNode> scheduledNodes = new HashSet<>();
private final AtomicBoolean done = new AtomicBoolean();
private final int maxWriterNodeCount;
Expand All @@ -59,15 +59,15 @@ public ScaledWriterScheduler(
Supplier<Collection<TaskStatus>> writerTasksProvider,
NodeSelector nodeSelector,
ScheduledExecutorService executor,
DataSize writerMinSize,
DataSize writerScalingMinDataProcessed,
int maxWriterNodeCount)
{
this.stage = requireNonNull(stage, "stage is null");
this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null");
this.writerTasksProvider = requireNonNull(writerTasksProvider, "writerTasksProvider is null");
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
this.executor = requireNonNull(executor, "executor is null");
this.writerMinSizeBytes = writerMinSize.toBytes();
this.writerScalingMinDataProcessed = writerScalingMinDataProcessed.toBytes();
this.maxWriterNodeCount = maxWriterNodeCount;
}

Expand Down Expand Up @@ -120,17 +120,17 @@ private boolean isSourceTasksBufferFull()
private boolean isWriteThroughputSufficient()
{
Collection<TaskStatus> writerTasks = writerTasksProvider.get();
long writtenBytes = writerTasks.stream()
.map(TaskStatus::getPhysicalWrittenDataSize)
long writerInputBytes = writerTasks.stream()
.map(TaskStatus::getWriterInputDataSize)
.mapToLong(DataSize::toBytes)
.sum();

long minWrittenBytesToScaleUp = writerTasks.stream()
long minWriterInputBytesToScaleUp = writerTasks.stream()
.map(TaskStatus::getMaxWriterCount)
.map(Optional::get)
.mapToLong(writerCount -> writerMinSizeBytes * writerCount)
.mapToLong(writerCount -> writerScalingMinDataProcessed * writerCount)
.sum();
return writtenBytes >= minWrittenBytesToScaleUp;
return writerInputBytes >= minWriterInputBytesToScaleUp;
}

private boolean isWeightedBufferFull()
Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ public CounterStat getOutputPositions()
return new CounterStat();
}

public long getWriterInputDataSize()
{
// Avoid using stream api for performance reasons
long writerInputDataSize = 0;
for (OperatorContext context : operatorContexts) {
writerInputDataSize += context.getWriterInputDataSize();
}
return writerInputDataSize;
}

public long getPhysicalWrittenDataSize()
{
// Avoid using stream api for performance reasons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class OperatorContext
private final AtomicReference<Metrics> metrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value.
private final AtomicReference<Metrics> connectorMetrics = new AtomicReference<>(Metrics.EMPTY); // this is not incremental, but gets overwritten by the latest value.

private final AtomicLong writerInputDataSize = new AtomicLong();
private final AtomicLong physicalWrittenDataSize = new AtomicLong();

private final AtomicReference<SettableFuture<Void>> memoryFuture;
Expand Down Expand Up @@ -245,6 +246,11 @@ public void setFinishedFuture(ListenableFuture<Void> finishedFuture)
checkState(this.finishedFuture.getAndSet(requireNonNull(finishedFuture, "finishedFuture is null")) == null, "finishedFuture already set");
}

public void recordWriterInputDataSize(long sizeInBytes)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need extra methods for this? I think this value is just same as inputDataSize for TableWriter

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this. However, then we have to sum inputDataSize for only TableWriter operator type in DriverContext to get writerInputDataSize.

{
writerInputDataSize.getAndAdd(sizeInBytes);
}

public void recordPhysicalWrittenData(long sizeInBytes)
{
physicalWrittenDataSize.getAndAdd(sizeInBytes);
Expand Down Expand Up @@ -485,6 +491,11 @@ public CounterStat getOutputPositions()
return outputPositions;
}

public long getWriterInputDataSize()
{
return writerInputDataSize.get();
}

public long getPhysicalWrittenDataSize()
{
return physicalWrittenDataSize.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ public CounterStat getOutputPositions()
return stat;
}

public long getWriterInputDataSize()
{
// Avoid using stream api due to performance reasons
long writerInputDataSize = 0;
for (DriverContext context : drivers) {
writerInputDataSize += context.getWriterInputDataSize();
}
return writerInputDataSize;
}

public long getPhysicalWrittenDataSize()
{
// Avoid using stream api due to performance reasons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public void addInput(Page page)
blocked = asVoid(allAsList(blockedOnAggregation, blockedOnWrite));
rowCount += page.getPositionCount();
updateWrittenBytes();
operatorContext.recordWriterInputDataSize(page.getSizeInBytes());
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ public CounterStat getOutputPositions()
return stat;
}

public long getWriterInputDataSize()
{
// Avoid using stream api due to performance reasons
long writerInputDataSize = 0;
for (PipelineContext context : pipelineContexts) {
writerInputDataSize += context.getWriterInputDataSize();
}
return writerInputDataSize;
}

public long getPhysicalWrittenDataSize()
{
// Avoid using stream api for performance reasons
Expand Down Expand Up @@ -599,6 +609,7 @@ public TaskStats getTaskStats()
succinctBytes(outputDataSize),
outputPositions,
new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
succinctBytes(getWriterInputDataSize()),
succinctBytes(physicalWrittenDataSize),
getMaxWriterCount(),
fullGcCount,
Expand Down
Loading