Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class SystemSessionProperties
public static final String REDISTRIBUTE_WRITES = "redistribute_writes";
public static final String SCALE_WRITERS = "scale_writers";
public static final String WRITER_MIN_SIZE = "writer_min_size";
public static final String OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER = "optimized_scale_writer_producer_buffer";
public static final String PUSH_TABLE_WRITE_THROUGH_UNION = "push_table_write_through_union";
public static final String EXECUTION_POLICY = "execution_policy";
public static final String DICTIONARY_AGGREGATION = "dictionary_aggregation";
Expand Down Expand Up @@ -299,6 +300,11 @@ public SystemSessionProperties(
false,
value -> DataSize.valueOf((String) value),
DataSize::toString),
booleanProperty(
OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER,
"Optimize scale writer creation based on producer buffer",
featuresConfig.isOptimizedScaleWriterProducerBuffer(),
true),
booleanProperty(
PUSH_TABLE_WRITE_THROUGH_UNION,
"Parallelize writes when using UNION ALL in queries that write data",
Expand Down Expand Up @@ -866,6 +872,11 @@ public static DataSize getWriterMinSize(Session session)
return session.getSystemProperty(WRITER_MIN_SIZE, DataSize.class);
}

public static boolean isOptimizedScaleWriterProducerBuffer(Session session)
{
return session.getSystemProperty(OPTIMIZED_SCALE_WRITER_PRODUCER_BUFFER, Boolean.class);
}

public static boolean isPushTableWriteThroughUnion(Session session)
{
return session.getSystemProperty(PUSH_TABLE_WRITE_THROUGH_UNION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ else if (taskHolder.getTaskExecution() != null) {
fullGcTime = taskContext.getFullGcTime();
}

return new TaskStatus(taskStateMachine.getTaskId(),
return new TaskStatus(
taskStateMachine.getTaskId(),
taskInstanceId,
versionNumber,
state,
Expand All @@ -293,6 +294,7 @@ else if (taskHolder.getTaskExecution() != null) {
failures,
queuedPartitionedDrivers,
runningPartitionedDrivers,
outputBuffer.getUtilization(),
isOutputBufferOverutilized(),
physicalWrittenDataSize,
userMemoryReservation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ public class TaskStatus

private final int queuedPartitionedDrivers;
private final int runningPartitionedDrivers;

private final double outputBufferUtilization;
private final boolean outputBufferOverutilized;

private final DataSize physicalWrittenDataSize;
private final DataSize memoryReservation;
private final DataSize systemMemoryReservation;
Expand All @@ -83,6 +86,7 @@ public TaskStatus(
@JsonProperty("failures") List<ExecutionFailureInfo> failures,
@JsonProperty("queuedPartitionedDrivers") int queuedPartitionedDrivers,
@JsonProperty("runningPartitionedDrivers") int runningPartitionedDrivers,
@JsonProperty("outputBufferUtilization") double outputBufferUtilization,
@JsonProperty("outputBufferOverutilized") boolean outputBufferOverutilized,
@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,
@JsonProperty("memoryReservation") DataSize memoryReservation,
Expand All @@ -106,6 +110,7 @@ public TaskStatus(
checkArgument(runningPartitionedDrivers >= 0, "runningPartitionedDrivers must be positive");
this.runningPartitionedDrivers = runningPartitionedDrivers;

this.outputBufferUtilization = outputBufferUtilization;
this.outputBufferOverutilized = outputBufferOverutilized;

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");
Expand Down Expand Up @@ -185,6 +190,12 @@ public DataSize getPhysicalWrittenDataSize()
return physicalWrittenDataSize;
}

@JsonProperty
public double getOutputBufferUtilization()
{
return outputBufferUtilization;
}

@JsonProperty
public boolean isOutputBufferOverutilized()
{
Expand Down Expand Up @@ -237,6 +248,7 @@ public static TaskStatus initialTaskStatus(TaskId taskId, URI location, String n
ImmutableList.of(),
0,
0,
0.0,
false,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand All @@ -258,6 +270,7 @@ public static TaskStatus failWith(TaskStatus taskStatus, TaskState state, List<E
exceptions,
taskStatus.getQueuedPartitionedDrivers(),
taskStatus.getRunningPartitionedDrivers(),
taskStatus.getOutputBufferUtilization(),
taskStatus.isOutputBufferOverutilized(),
taskStatus.getPhysicalWrittenDataSize(),
taskStatus.getMemoryReservation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ public class ScaledWriterScheduler
private final Supplier<Collection<TaskStatus>> writerTasksProvider;
private final NodeSelector nodeSelector;
private final ScheduledExecutorService executor;

private final boolean optimizedScaleWriterProducerBuffer;
private final long writerMinSizeBytes;

private final Set<InternalNode> scheduledNodes = new HashSet<>();

private final AtomicBoolean done = new AtomicBoolean();
private volatile SettableFuture<?> future = SettableFuture.create();

Expand All @@ -56,14 +60,16 @@ public ScaledWriterScheduler(
Supplier<Collection<TaskStatus>> writerTasksProvider,
NodeSelector nodeSelector,
ScheduledExecutorService executor,
DataSize writerMinSize)
DataSize writerMinSize,
boolean optimizedScaleWriterProducerBuffer)
{
this.stage = requireNonNull(stage, "stage is null");
this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null");
this.writerTasksProvider = requireNonNull(writerTasksProvider, "writerTasksProvider is null");
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
this.executor = requireNonNull(executor, "executor is null");
this.writerMinSizeBytes = requireNonNull(writerMinSize, "minWriterSize is null").toBytes();
this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer;
}

public void finish()
Expand Down Expand Up @@ -105,6 +111,18 @@ private int getNewTaskCount()
return 1;
}

if (optimizedScaleWriterProducerBuffer) {
double totalProducerBufferUtilization = sourceTasksProvider.get().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit confused about this loging. I was thinking more about changing the condition to something like if 50% overutilized OR 90% non empty - add more writer. Thoughts?

Copy link
Contributor Author

@wenleix wenleix Mar 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr : Addressed the comments per offline discussion. Basically now we add more writers if the overall buffer utilization is more than the number of writers.

We don't need to worry about accidentally open too many writers since overall writtenBytes has to be more than writerMinSizeBytes * scheduledNodes.size()

            writtenBytes >= writerMinSizeBytes * scheduledNodes.size()) 

.filter(task -> !task.getState().isDone())
.mapToDouble(TaskStatus::getOutputBufferUtilization)
.sum();

if (totalProducerBufferUtilization >= scheduledNodes.size() &&
writtenBytes >= writerMinSizeBytes * scheduledNodes.size()) {
return 1;
}
}

return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static com.facebook.presto.SystemSessionProperties.getConcurrentLifespansPerNode;
import static com.facebook.presto.SystemSessionProperties.getMaxTasksPerStage;
import static com.facebook.presto.SystemSessionProperties.getWriterMinSize;
import static com.facebook.presto.SystemSessionProperties.isOptimizedScaleWriterProducerBuffer;
import static com.facebook.presto.execution.SqlStageExecution.createSqlStageExecution;
import static com.facebook.presto.execution.scheduler.SourcePartitionedScheduler.newSourcePartitionedSchedulerAsStageScheduler;
import static com.facebook.presto.execution.scheduler.TableWriteInfo.createTableWriteInfo;
Expand Down Expand Up @@ -305,7 +306,8 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
writerTasksProvider,
nodeScheduler.createNodeSelector(null),
scheduledExecutor,
getWriterMinSize(session));
getWriterMinSize(session),
isOptimizedScaleWriterProducerBuffer(session));
whenAllStages(childStageExecutions, StageExecutionState::isDone)
.addListener(scheduler::finish, directExecutor());
return scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class FeaturesConfig
private boolean redistributeWrites = true;
private boolean scaleWriters;
private DataSize writerMinSize = new DataSize(32, DataSize.Unit.MEGABYTE);
private boolean optimizedScaleWriterProducerBuffer;
private boolean optimizeMetadataQueries;
private boolean optimizeHashGeneration = true;
private boolean enableIntermediateAggregations;
Expand Down Expand Up @@ -584,6 +585,18 @@ public FeaturesConfig setWriterMinSize(DataSize writerMinSize)
return this;
}

public boolean isOptimizedScaleWriterProducerBuffer()
{
return optimizedScaleWriterProducerBuffer;
}

@Config("optimized-scale-writer-producer-buffer")
public FeaturesConfig setOptimizedScaleWriterProducerBuffer(boolean optimizedScaleWriterProducerBuffer)
{
this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer;
return this;
}

public boolean isOptimizeMetadataQueries()
{
return optimizeMetadataQueries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public TaskInfo getTaskInfo()
failures,
0,
0,
0.0,
false,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand Down Expand Up @@ -290,6 +291,7 @@ public TaskStatus getTaskStatus()
ImmutableList.of(),
stats.getQueuedPartitionedDrivers(),
stats.getRunningPartitionedDrivers(),
0.0,
false,
stats.getPhysicalWrittenDataSize(),
stats.getUserMemoryReservation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ private TaskStatus buildTaskStatus()
initialTaskStatus.getFailures(),
initialTaskStatus.getQueuedPartitionedDrivers(),
initialTaskStatus.getRunningPartitionedDrivers(),
initialTaskStatus.getOutputBufferUtilization(),
initialTaskStatus.isOutputBufferOverutilized(),
initialTaskStatus.getPhysicalWrittenDataSize(),
initialTaskStatus.getMemoryReservation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void testDefaults()
.setRedistributeWrites(true)
.setScaleWriters(false)
.setWriterMinSize(new DataSize(32, MEGABYTE))
.setOptimizedScaleWriterProducerBuffer(false)
.setOptimizeMetadataQueries(false)
.setOptimizeHashGeneration(true)
.setPushTableWriteThroughUnion(true)
Expand Down Expand Up @@ -171,6 +172,7 @@ public void testExplicitPropertyMappings()
.put("redistribute-writes", "false")
.put("scale-writers", "true")
.put("writer-min-size", "42GB")
.put("optimized-scale-writer-producer-buffer", "true")
.put("optimizer.optimize-metadata-queries", "true")
.put("optimizer.optimize-hash-generation", "false")
.put("optimizer.optimize-mixed-distinct-aggregations", "true")
Expand Down Expand Up @@ -247,6 +249,7 @@ public void testExplicitPropertyMappings()
.setRedistributeWrites(false)
.setScaleWriters(true)
.setWriterMinSize(new DataSize(42, GIGABYTE))
.setOptimizedScaleWriterProducerBuffer(true)
.setOptimizeMetadataQueries(true)
.setOptimizeHashGeneration(false)
.setOptimizeMixedDistinctAggregations(true)
Expand Down