diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 510a473cee98a..d2d83140f66ab 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -89,6 +89,7 @@ public final class SystemSessionProperties public static final String REDISTRIBUTE_WRITES = "redistribute_writes"; public static final String SCALE_WRITERS = "scale_writers"; public static final String WRITER_MIN_SIZE = "writer_min_size"; + public static final String OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER = "optimized_scale_writer_producer_buffer"; public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union"; public static final String EXECUTION_POLICY = "execution_policy"; public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation"; @@ -299,6 +300,11 @@ public SystemSessionProperties( false, value -> DataSize.valueOf((String) value), DataSize::toString), + booleanProperty( + OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER, + "Optimize scale writer creation based on producer buffer", + featuresConfig.isOptimizedScaleWriterProducerBuffer(), + true), booleanProperty( PUSH_TABLE_WRITE_THROUGH_UNION, "Parallelize writes when using UNION ALL in queries that write data", @@ -866,6 +872,11 @@ public static DataSize getWriterMinSize(Session session) return session.getSystemProperty(WRITER_MIN_SIZE, DataSize.class); } + public static boolean isOptimizedScaleWriterProducerBuffer(Session session) + { + return session.getSystemProperty(OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER, Boolean.class); + } + public static boolean isPushTableWriteThroughUnion(Session session) { return session.getSystemProperty(PUSH_TABLE_WRITE_THROUGH_UNION, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java index fca796bfd8c71..ccd8574835b9e 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java @@ -283,7 +283,8 @@ else if (taskHolder.getTaskExecution() != null) { fullGcTime = taskContext.getFullGcTime(); } - return new TaskStatus(taskStateMachine.getTaskId(), + return new TaskStatus( + taskStateMachine.getTaskId(), taskInstanceId, versionNumber, state, @@ -293,6 +294,7 @@ else if (taskHolder.getTaskExecution() != null) { failures, queuedPartitionedDrivers, runningPartitionedDrivers, + outputBuffer.getUtilization(), isOutputBufferOverutilized(), physicalWrittenDataSize, userMemoryReservation, diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskStatus.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskStatus.java index c935338b319f0..015aa8761e42d 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskStatus.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskStatus.java @@ -61,7 +61,10 @@ public class TaskStatus private final int queuedPartitionedDrivers; private final int runningPartitionedDrivers; + + private final double outputBufferUtilization; private final boolean outputBufferOverutilized; + private final DataSize physicalWrittenDataSize; private final DataSize memoryReservation; private final DataSize systemMemoryReservation; @@ -83,6 +86,7 @@ public TaskStatus( @JsonProperty("failures") List failures, @JsonProperty("queuedPartitionedDrivers") int queuedPartitionedDrivers, @JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers, + @JsonProperty("outputBufferUtilization") double outputBufferUtilization, @JsonProperty("outputBufferOverutilized") boolean outputBufferOverutilized, @JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize, @JsonProperty("memoryReservation") DataSize memoryReservation, @@ -106,6 +110,7 @@ public TaskStatus( checkArgument(runningPartitionedDrivers >= 0, "runningPartitionedDrivers must be positive"); this.runningPartitionedDrivers = runningPartitionedDrivers; + this.outputBufferUtilization = outputBufferUtilization; this.outputBufferOverutilized = outputBufferOverutilized; this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null"); @@ -185,6 +190,12 @@ public DataSize getPhysicalWrittenDataSize() return physicalWrittenDataSize; } + @JsonProperty + public double getOutputBufferUtilization() + { + return outputBufferUtilization; + } + @JsonProperty public boolean isOutputBufferOverutilized() { @@ -237,6 +248,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n ImmutableList.of(), 0, 0, + 0.0, false, new DataSize(0, BYTE), new DataSize(0, BYTE), @@ -258,6 +270,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List> writerTasksProvider; private final NodeSelector nodeSelector; private final ScheduledExecutorService executor; + + private final boolean optimizedScaleWriterProducerBuffer; private final long writerMinSizeBytes; + private final Set scheduledNodes = new HashSet<>(); + private final AtomicBoolean done = new AtomicBoolean(); private volatile SettableFuture future = SettableFuture.create(); @@ -56,7 +60,8 @@ public ScaledWriterScheduler( Supplier> writerTasksProvider, NodeSelector nodeSelector, ScheduledExecutorService executor, - DataSize writerMinSize) + DataSize writerMinSize, + boolean optimizedScaleWriterProducerBuffer) { this.stage = requireNonNull(stage, "stage is null"); this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null"); @@ -64,6 +69,7 @@ public ScaledWriterScheduler( this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null"); this.executor = requireNonNull(executor, "executor is null"); this.writerMinSizeBytes = requireNonNull(writerMinSize, "minWriterSize is null").toBytes(); + this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer; } public void finish() @@ -105,6 +111,18 @@ private int getNewTaskCount() return 1; } + if (optimizedScaleWriterProducerBuffer) { + double totalProducerBufferUtilization = sourceTasksProvider.get().stream() + .filter(task -> !task.getState().isDone()) + .mapToDouble(TaskStatus::getOutputBufferUtilization) + .sum(); + + if (totalProducerBufferUtilization >= scheduledNodes.size() && + writtenBytes >= writerMinSizeBytes * scheduledNodes.size()) { + return 1; + } + } + return 0; } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 66b9c298f39f9..33f6646aacbfe 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -66,6 +66,7 @@ import static com.facebook.presto.SystemSessionProperties.getConcurrentLifespansPerNode; import static com.facebook.presto.SystemSessionProperties.getMaxTasksPerStage; import static com.facebook.presto.SystemSessionProperties.getWriterMinSize; +import static com.facebook.presto.SystemSessionProperties.isOptimizedScaleWriterProducerBuffer; import static com.facebook.presto.execution.SqlStageExecution.createSqlStageExecution; import static com.facebook.presto.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler; import static com.facebook.presto.execution.scheduler.TableWriteInfo.createTableWriteInfo; @@ -305,7 +306,8 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { writerTasksProvider, nodeScheduler.createNodeSelector(null), scheduledExecutor, - getWriterMinSize(session)); + getWriterMinSize(session), + isOptimizedScaleWriterProducerBuffer(session)); whenAllStages(childStageExecutions, StageExecutionState::isDone) .addListener(scheduler::finish, directExecutor()); return scheduler; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 3c193bd8f4ae9..7efc0108e5bb3 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -83,6 +83,7 @@ public class FeaturesConfig private boolean redistributeWrites = true; private boolean scaleWriters; private DataSize writerMinSize = new DataSize(32, DataSize.Unit.MEGABYTE); + private boolean optimizedScaleWriterProducerBuffer; private boolean optimizeMetadataQueries; private boolean optimizeHashGeneration = true; private boolean enableIntermediateAggregations; @@ -584,6 +585,18 @@ public FeaturesConfig setWriterMinSize(DataSize writerMinSize) return this; } + public boolean isOptimizedScaleWriterProducerBuffer() + { + return optimizedScaleWriterProducerBuffer; + } + + @Config("optimized-scale-writer-producer-buffer") + public FeaturesConfig setOptimizedScaleWriterProducerBuffer(boolean optimizedScaleWriterProducerBuffer) + { + this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer; + return this; + } + public boolean isOptimizeMetadataQueries() { return optimizeMetadataQueries; diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java index 9907cdc67ac87..9258d8ce4e9d4 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java @@ -257,6 +257,7 @@ public TaskInfo getTaskInfo() failures, 0, 0, + 0.0, false, new DataSize(0, BYTE), new DataSize(0, BYTE), @@ -290,6 +291,7 @@ public TaskStatus getTaskStatus() ImmutableList.of(), stats.getQueuedPartitionedDrivers(), stats.getRunningPartitionedDrivers(), + 0.0, false, stats.getPhysicalWrittenDataSize(), stats.getUserMemoryReservation(), diff --git a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java index ea8fd12db3677..310d6a473d6fc 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java +++ b/presto-main/src/test/java/com/facebook/presto/server/remotetask/TestHttpRemoteTask.java @@ -498,6 +498,7 @@ private TaskStatus buildTaskStatus() initialTaskStatus.getFailures(), initialTaskStatus.getQueuedPartitionedDrivers(), initialTaskStatus.getRunningPartitionedDrivers(), + initialTaskStatus.getOutputBufferUtilization(), initialTaskStatus.isOutputBufferOverutilized(), initialTaskStatus.getPhysicalWrittenDataSize(), initialTaskStatus.getMemoryReservation(), diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index cf7bbeaca6939..9c104f45e7a27 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -73,6 +73,7 @@ public void testDefaults() .setRedistributeWrites(true) .setScaleWriters(false) .setWriterMinSize(new DataSize(32, MEGABYTE)) + .setOptimizedScaleWriterProducerBuffer(false) .setOptimizeMetadataQueries(false) .setOptimizeHashGeneration(true) .setPushTableWriteThroughUnion(true) @@ -171,6 +172,7 @@ public void testExplicitPropertyMappings() .put("redistribute-writes", "false") .put("scale-writers", "true") .put("writer-min-size", "42GB") + .put("optimized-scale-writer-producer-buffer", "true") .put("optimizer.optimize-metadata-queries", "true") .put("optimizer.optimize-hash-generation", "false") .put("optimizer.optimize-mixed-distinct-aggregations", "true") @@ -247,6 +249,7 @@ public void testExplicitPropertyMappings() .setRedistributeWrites(false) .setScaleWriters(true) .setWriterMinSize(new DataSize(42, GIGABYTE)) + .setOptimizedScaleWriterProducerBuffer(true) .setOptimizeMetadataQueries(true) .setOptimizeHashGeneration(false) .setOptimizeMixedDistinctAggregations(true)