Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public final class SystemSessionProperties
public static final String PULL_EXPRESSION_FROM_LAMBDA_ENABLED = "pull_expression_from_lambda_enabled";
public static final String REWRITE_CONSTANT_ARRAY_CONTAINS_TO_IN_EXPRESSION = "rewrite_constant_array_contains_to_in_expression";
public static final String INFER_INEQUALITY_PREDICATES = "infer_inequality_predicates";
public static final String ENABLE_HISTORY_BASED_SCALED_WRITER = "enable_history_based_scaled_writer";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "native_simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1751,6 +1752,11 @@ public SystemSessionProperties(
INFER_INEQUALITY_PREDICATES,
"Infer nonequality predicates for joins",
featuresConfig.getInferInequalityPredicates(),
false),
booleanProperty(
ENABLE_HISTORY_BASED_SCALED_WRITER,
"Enable setting the initial number of tasks for scaled writers with HBO",
featuresConfig.isUseHBOForScaledWriters(),
false));
}

Expand Down Expand Up @@ -2920,4 +2926,9 @@ public static boolean shouldInferInequalityPredicates(Session session)
{
return session.getSystemProperty(INFER_INEQUALITY_PREDICATES, Boolean.class);
}

public static boolean useHBOForScaledWriters(Session session)
{
return session.getSystemProperty(ENABLE_HISTORY_BASED_SCALED_WRITER, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import com.facebook.presto.spi.statistics.JoinNodeStatistics;
import com.facebook.presto.spi.statistics.PlanStatistics;
import com.facebook.presto.spi.statistics.PlanStatisticsWithSourceInfo;
import com.facebook.presto.spi.statistics.TableWriterNodeStatistics;
import com.facebook.presto.sql.planner.CanonicalPlan;
import com.facebook.presto.sql.planner.PlanNodeCanonicalInfo;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.facebook.presto.sql.planner.planPrinter.PlanNodeStats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
Expand All @@ -52,6 +55,7 @@
import static com.facebook.presto.common.resourceGroups.QueryType.INSERT;
import static com.facebook.presto.common.resourceGroups.QueryType.SELECT;
import static com.facebook.presto.cost.HistoricalPlanStatisticsUtil.updatePlanStatistics;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static com.facebook.presto.sql.planner.planPrinter.PlanNodeStatsSummarizer.aggregateStageStats;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -130,6 +134,7 @@ public Map<PlanNodeWithHash, PlanStatisticsWithSourceInfo> getQueryStats(QueryIn
if (!stageInfo.getPlan().isPresent()) {
continue;
}
boolean isScaledWriterStage = stageInfo.getPlan().isPresent() && stageInfo.getPlan().get().getPartitioning().equals(SCALED_WRITER_DISTRIBUTION);
PlanNode root = stageInfo.getPlan().get().getRoot();
for (PlanNode planNode : forTree(PlanNode::getSources).depthFirstPreOrder(root)) {
if (!planNode.getStatsEquivalentPlanNode().isPresent()) {
Expand All @@ -144,6 +149,16 @@ public Map<PlanNodeWithHash, PlanStatisticsWithSourceInfo> getQueryStats(QueryIn
double nullJoinBuildKeyCount = planNodeStats.getPlanNodeNullJoinBuildKeyCount();
double joinBuildKeyCount = planNodeStats.getPlanNodeJoinBuildKeyCount();

JoinNodeStatistics joinNodeStatistics = JoinNodeStatistics.empty();
if (planNode instanceof JoinNode) {
joinNodeStatistics = new JoinNodeStatistics(Estimate.of(nullJoinBuildKeyCount), Estimate.of(joinBuildKeyCount));
}

TableWriterNodeStatistics tableWriterNodeStatistics = TableWriterNodeStatistics.empty();
if (isScaledWriterStage && planNode instanceof TableWriterNode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is TableWriterMergeNode relevant here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the scaled writer optimization is only related to table writer node, not related to table writer merger node.

tableWriterNodeStatistics = new TableWriterNodeStatistics(Estimate.of(stageInfo.getLatestAttemptExecutionInfo().getStats().getTotalTasks()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the number of tasks for the stage, and record it.

}

PlanNode statsEquivalentPlanNode = planNode.getStatsEquivalentPlanNode().get();
for (PlanCanonicalizationStrategy strategy : historyBasedPlanCanonicalizationStrategyList()) {
Optional<PlanNodeCanonicalInfo> planNodeCanonicalInfo = Optional.ofNullable(
Expand All @@ -152,20 +167,23 @@ public Map<PlanNodeWithHash, PlanStatisticsWithSourceInfo> getQueryStats(QueryIn
String hash = planNodeCanonicalInfo.get().getHash();
List<PlanStatistics> inputTableStatistics = planNodeCanonicalInfo.get().getInputTableStatistics();
PlanNodeWithHash planNodeWithHash = new PlanNodeWithHash(statsEquivalentPlanNode, Optional.of(hash));
// Plan node added after HistoricalStatisticsEquivalentPlanMarkingOptimizer will have the same hash as its source node. If the source node is join node,
// the newly added node will have the same hash with the join but no join statistics, hence we need to overwrite in this case.
if (!planStatistics.containsKey(planNodeWithHash) || nullJoinBuildKeyCount > 0 || joinBuildKeyCount > 0) {
planStatistics.put(
planNodeWithHash,
new PlanStatisticsWithSourceInfo(
planNode.getId(),
new PlanStatistics(
Estimate.of(outputPositions),
Double.isNaN(outputBytes) ? Estimate.unknown() : Estimate.of(outputBytes),
1.0,
new JoinNodeStatistics(Estimate.of(nullJoinBuildKeyCount), Estimate.of(joinBuildKeyCount))),
new HistoryBasedSourceInfo(Optional.of(hash), Optional.of(inputTableStatistics))));
// Plan node added after HistoricalStatisticsEquivalentPlanMarkingOptimizer will have the same hash as its source node. If the source node is not join or
// table writer node, the newly added node will have the same hash but no join/table writer statistics, hence we need to overwrite in this case.
PlanStatistics newPlanNodeStats = new PlanStatistics(
Estimate.of(outputPositions),
Double.isNaN(outputBytes) ? Estimate.unknown() : Estimate.of(outputBytes),
1.0,
joinNodeStatistics,
tableWriterNodeStatistics);
if (planStatistics.containsKey(planNodeWithHash)) {
newPlanNodeStats = planStatistics.get(planNodeWithHash).getPlanStatistics().update(newPlanNodeStats);
}
planStatistics.put(
planNodeWithHash,
new PlanStatisticsWithSourceInfo(
planNode.getId(),
newPlanNodeStats,
new HistoryBasedSourceInfo(Optional.of(hash), Optional.of(inputTableStatistics))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
*/
package com.facebook.presto.cost;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -25,7 +28,8 @@ public class JoinNodeStatsEstimate
private final double nullJoinBuildKeyCount;
private final double joinBuildKeyCount;

public JoinNodeStatsEstimate(double nullJoinBuildKeyCount, double joinBuildKeyCount)
@JsonCreator
public JoinNodeStatsEstimate(@JsonProperty("nullJoinBuildKeyCount") double nullJoinBuildKeyCount, @JsonProperty("joinBuildKeyCount") double joinBuildKeyCount)
{
this.nullJoinBuildKeyCount = nullJoinBuildKeyCount;
this.joinBuildKeyCount = joinBuildKeyCount;
Expand All @@ -36,11 +40,13 @@ public static JoinNodeStatsEstimate unknown()
return UNKNOWN;
}

@JsonProperty
public double getNullJoinBuildKeyCount()
{
return nullJoinBuildKeyCount;
}

@JsonProperty
public double getJoinBuildKeyCount()
{
return joinBuildKeyCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.spi.statistics.PlanStatistics;
import com.facebook.presto.spi.statistics.PlanStatisticsWithSourceInfo;
import com.facebook.presto.spi.statistics.SourceInfo;
import com.facebook.presto.spi.statistics.TableWriterNodeStatistics;
import com.facebook.presto.sql.Serialization;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -49,7 +50,7 @@
public class PlanNodeStatsEstimate
{
private static final double DEFAULT_DATA_SIZE_PER_COLUMN = 50;
private static final PlanNodeStatsEstimate UNKNOWN = new PlanNodeStatsEstimate(NaN, NaN, false, ImmutableMap.of());
private static final PlanNodeStatsEstimate UNKNOWN = new PlanNodeStatsEstimate(NaN, NaN, false, ImmutableMap.of(), JoinNodeStatsEstimate.unknown(), TableWriterNodeStatsEstimate.unknown());

private final double outputRowCount;
private final double totalSize;
Expand All @@ -59,6 +60,8 @@ public class PlanNodeStatsEstimate

private final JoinNodeStatsEstimate joinNodeStatsEstimate;

private final TableWriterNodeStatsEstimate tableWriterNodeStatsEstimate;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add table writer stats to estimate

public static PlanNodeStatsEstimate unknown()
{
return UNKNOWN;
Expand All @@ -69,9 +72,11 @@ public PlanNodeStatsEstimate(
@JsonProperty("outputRowCount") double outputRowCount,
@JsonProperty("totalSize") double totalSize,
@JsonProperty("confident") boolean confident,
@JsonProperty("variableStatistics") Map<VariableReferenceExpression, VariableStatsEstimate> variableStatistics)
@JsonProperty("variableStatistics") Map<VariableReferenceExpression, VariableStatsEstimate> variableStatistics,
@JsonProperty("joinNodeStatsEstimate") JoinNodeStatsEstimate joinNodeStatsEstimate,
@JsonProperty("tableWriterNodeStatsEstimate") TableWriterNodeStatsEstimate tableWriterNodeStatsEstimate)
{
this(outputRowCount, totalSize, confident, HashTreePMap.from(requireNonNull(variableStatistics, "variableStatistics is null")));
this(outputRowCount, totalSize, HashTreePMap.from(requireNonNull(variableStatistics, "variableStatistics is null")), new CostBasedSourceInfo(confident), joinNodeStatsEstimate, tableWriterNodeStatsEstimate);
}

private PlanNodeStatsEstimate(double outputRowCount, double totalSize, boolean confident, PMap<VariableReferenceExpression, VariableStatsEstimate> variableStatistics)
Expand All @@ -81,18 +86,19 @@ private PlanNodeStatsEstimate(double outputRowCount, double totalSize, boolean c

public PlanNodeStatsEstimate(double outputRowCount, double totalSize, PMap<VariableReferenceExpression, VariableStatsEstimate> variableStatistics, SourceInfo sourceInfo)
{
this(outputRowCount, totalSize, variableStatistics, sourceInfo, JoinNodeStatsEstimate.unknown());
this(outputRowCount, totalSize, variableStatistics, sourceInfo, JoinNodeStatsEstimate.unknown(), TableWriterNodeStatsEstimate.unknown());
}

public PlanNodeStatsEstimate(double outputRowCount, double totalSize, PMap<VariableReferenceExpression, VariableStatsEstimate> variableStatistics, SourceInfo sourceInfo,
JoinNodeStatsEstimate joinNodeStatsEstimate)
JoinNodeStatsEstimate joinNodeStatsEstimate, TableWriterNodeStatsEstimate tableWriterNodeStatsEstimate)
{
checkArgument(isNaN(outputRowCount) || outputRowCount >= 0, "outputRowCount cannot be negative");
this.outputRowCount = outputRowCount;
this.totalSize = totalSize;
this.variableStatistics = variableStatistics;
this.sourceInfo = requireNonNull(sourceInfo, "SourceInfo is null");
this.joinNodeStatsEstimate = requireNonNull(joinNodeStatsEstimate, "joinNodeSpecificStatsEstimate is null");
this.tableWriterNodeStatsEstimate = requireNonNull(tableWriterNodeStatsEstimate, "tableWriterNodeStatsEstimate is null");
}

/**
Expand Down Expand Up @@ -122,11 +128,18 @@ public SourceInfo getSourceInfo()
return sourceInfo;
}

@JsonProperty
public JoinNodeStatsEstimate getJoinNodeStatsEstimate()
{
return joinNodeStatsEstimate;
}

@JsonProperty
public TableWriterNodeStatsEstimate getTableWriterNodeStatsEstimate()
{
return tableWriterNodeStatsEstimate;
}

/**
* Only use when getting all columns and meanwhile do not want to
* do per-column estimation.
Expand Down Expand Up @@ -241,9 +254,12 @@ public PlanNodeStatsEstimate combineStats(PlanStatistics planStatistics, SourceI
planStatistics.getOutputSize().getValue(),
variableStatistics,
statsSourceInfo,
new JoinNodeStatsEstimate(
planStatistics.getJoinNodeStatistics().getNullJoinBuildKeyCount().getValue(),
planStatistics.getJoinNodeStatistics().getJoinBuildKeyCount().getValue()));
planStatistics.getJoinNodeStatistics().isEmpty() ? getJoinNodeStatsEstimate() :
new JoinNodeStatsEstimate(
planStatistics.getJoinNodeStatistics().getNullJoinBuildKeyCount().getValue(),
planStatistics.getJoinNodeStatistics().getJoinBuildKeyCount().getValue()),
planStatistics.getTableWriterNodeStatistics().isEmpty() ? getTableWriterNodeStatsEstimate() :
new TableWriterNodeStatsEstimate(planStatistics.getTableWriterNodeStatistics().getTaskCountIfScaledWriter().getValue()));
}
return this;
}
Expand Down Expand Up @@ -293,7 +309,8 @@ public PlanStatisticsWithSourceInfo toPlanStatisticsWithSourceInfo(PlanNodeId id
sourceInfo.isConfident() ? 1 : 0,
new JoinNodeStatistics(
Estimate.estimateFromDouble(joinNodeStatsEstimate.getNullJoinBuildKeyCount()),
Estimate.estimateFromDouble(joinNodeStatsEstimate.getJoinBuildKeyCount()))),
Estimate.estimateFromDouble(joinNodeStatsEstimate.getJoinBuildKeyCount())),
new TableWriterNodeStatistics(Estimate.estimateFromDouble(tableWriterNodeStatsEstimate.getTaskCountIfScaledWriter()))),
sourceInfo);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.cost;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.lang.Double.NaN;

public class TableWriterNodeStatsEstimate
{
private static final TableWriterNodeStatsEstimate UNKNOWN = new TableWriterNodeStatsEstimate(NaN);

private final double taskCountIfScaledWriter;

@JsonCreator
public TableWriterNodeStatsEstimate(@JsonProperty("taskCountIfScaledWriter") double taskCountIfScaledWriter)
{
this.taskCountIfScaledWriter = taskCountIfScaledWriter;
}

public static TableWriterNodeStatsEstimate unknown()
{
return UNKNOWN;
}

@JsonProperty
public double getTaskCountIfScaledWriter()
{
return taskCountIfScaledWriter;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("taskCountIfScaledWriter", taskCountIfScaledWriter)
.toString();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableWriterNodeStatsEstimate that = (TableWriterNodeStatsEstimate) o;
return Double.compare(taskCountIfScaledWriter, that.taskCountIfScaledWriter) == 0;
}

@Override
public int hashCode()
{
return Objects.hash(taskCountIfScaledWriter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ScaledWriterScheduler

private final boolean optimizedScaleWriterProducerBuffer;
private final long writerMinSizeBytes;
private final Optional<Integer> initialTaskCount;

private final Set<InternalNode> scheduledNodes = new HashSet<>();

Expand All @@ -61,7 +62,8 @@ public ScaledWriterScheduler(
NodeSelector nodeSelector,
ScheduledExecutorService executor,
DataSize writerMinSize,
boolean optimizedScaleWriterProducerBuffer)
boolean optimizedScaleWriterProducerBuffer,
Optional<Integer> initialTaskCount)
{
this.stage = requireNonNull(stage, "stage is null");
this.sourceTasksProvider = requireNonNull(sourceTasksProvider, "sourceTasksProvider is null");
Expand All @@ -70,6 +72,7 @@ public ScaledWriterScheduler(
this.executor = requireNonNull(executor, "executor is null");
this.writerMinSizeBytes = requireNonNull(writerMinSize, "minWriterSize is null").toBytes();
this.optimizedScaleWriterProducerBuffer = optimizedScaleWriterProducerBuffer;
this.initialTaskCount = requireNonNull(initialTaskCount, "initialTaskCount is null");
}

public void finish()
Expand All @@ -93,7 +96,7 @@ public ScheduleResult schedule()
private int getNewTaskCount()
{
if (scheduledNodes.isEmpty()) {
return 1;
return initialTaskCount.orElse(1);
}

double fullTasks = sourceTasksProvider.get().stream()
Expand Down
Loading