Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ public Map<PlanNodeWithHash, PlanStatisticsWithSourceInfo> getQueryStats(QueryIn
double outputBytes = adjustedOutputBytes(planNode, planNodeStats);
double nullJoinBuildKeyCount = planNodeStats.getPlanNodeNullJoinBuildKeyCount();
double joinBuildKeyCount = planNodeStats.getPlanNodeJoinBuildKeyCount();
double nullJoinProbeKeyCount = planNodeStats.getPlanNodeNullJoinProbeKeyCount();
double joinProbeKeyCount = planNodeStats.getPlanNodeJoinProbeKeyCount();

JoinNodeStatistics joinNodeStatistics = JoinNodeStatistics.empty();
if (planNode instanceof JoinNode) {
joinNodeStatistics = new JoinNodeStatistics(Estimate.of(nullJoinBuildKeyCount), Estimate.of(joinBuildKeyCount));
joinNodeStatistics = new JoinNodeStatistics(Estimate.of(nullJoinBuildKeyCount), Estimate.of(joinBuildKeyCount), Estimate.of(nullJoinProbeKeyCount), Estimate.of(joinProbeKeyCount));
}

TableWriterNodeStatistics tableWriterNodeStatistics = TableWriterNodeStatistics.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@

public class JoinNodeStatsEstimate
{
private static final JoinNodeStatsEstimate UNKNOWN = new JoinNodeStatsEstimate(NaN, NaN);
private static final JoinNodeStatsEstimate UNKNOWN = new JoinNodeStatsEstimate(NaN, NaN, NaN, NaN);

private final double nullJoinBuildKeyCount;
private final double joinBuildKeyCount;
private final double nullJoinProbeKeyCount;
private final double joinProbeKeyCount;

@JsonCreator
public JoinNodeStatsEstimate(@JsonProperty("nullJoinBuildKeyCount") double nullJoinBuildKeyCount, @JsonProperty("joinBuildKeyCount") double joinBuildKeyCount)
public JoinNodeStatsEstimate(
@JsonProperty("nullJoinBuildKeyCount") double nullJoinBuildKeyCount,
@JsonProperty("joinBuildKeyCount") double joinBuildKeyCount,
@JsonProperty("nullJoinProbeKeyCount") double nullJoinProbeKeyCount,
@JsonProperty("joinProbeKeyCount") double joinProbeKeyCount)
{
this.nullJoinBuildKeyCount = nullJoinBuildKeyCount;
this.joinBuildKeyCount = joinBuildKeyCount;
this.nullJoinProbeKeyCount = nullJoinProbeKeyCount;
this.joinProbeKeyCount = joinProbeKeyCount;
}

public static JoinNodeStatsEstimate unknown()
Expand All @@ -52,12 +60,26 @@ public double getJoinBuildKeyCount()
return joinBuildKeyCount;
}

@JsonProperty
public double getNullJoinProbeKeyCount()
{
return nullJoinProbeKeyCount;
}

@JsonProperty
public double getJoinProbeKeyCount()
{
return joinProbeKeyCount;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("nullJoinBuildKeyCount", nullJoinBuildKeyCount)
.add("joinBuildKeyCount", joinBuildKeyCount)
.add("nullJoinProbeKeyCount", nullJoinProbeKeyCount)
.add("joinProbeKeyCount", joinProbeKeyCount)
.toString();
}

Expand All @@ -72,12 +94,14 @@ public boolean equals(Object o)
}
JoinNodeStatsEstimate that = (JoinNodeStatsEstimate) o;
return Double.compare(nullJoinBuildKeyCount, that.nullJoinBuildKeyCount) == 0 &&
Double.compare(joinBuildKeyCount, that.joinBuildKeyCount) == 0;
Double.compare(joinBuildKeyCount, that.joinBuildKeyCount) == 0 &&
Double.compare(nullJoinProbeKeyCount, that.nullJoinProbeKeyCount) == 0 &&
Double.compare(joinProbeKeyCount, that.joinProbeKeyCount) == 0;
}

@Override
public int hashCode()
{
return Objects.hash(nullJoinBuildKeyCount, joinBuildKeyCount);
return Objects.hash(nullJoinBuildKeyCount, joinBuildKeyCount, nullJoinProbeKeyCount, joinProbeKeyCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ public PlanNodeStatsEstimate combineStats(PlanStatistics planStatistics, SourceI
planStatistics.getJoinNodeStatistics().isEmpty() ? getJoinNodeStatsEstimate() :
new JoinNodeStatsEstimate(
planStatistics.getJoinNodeStatistics().getNullJoinBuildKeyCount().getValue(),
planStatistics.getJoinNodeStatistics().getJoinBuildKeyCount().getValue()),
planStatistics.getJoinNodeStatistics().getJoinBuildKeyCount().getValue(),
planStatistics.getJoinNodeStatistics().getNullJoinProbeKeyCount().getValue(),
planStatistics.getJoinNodeStatistics().getJoinProbeKeyCount().getValue()),
planStatistics.getTableWriterNodeStatistics().isEmpty() ? getTableWriterNodeStatsEstimate() :
new TableWriterNodeStatsEstimate(planStatistics.getTableWriterNodeStatistics().getTaskCountIfScaledWriter().getValue()));
}
Expand Down Expand Up @@ -309,7 +311,9 @@ public PlanStatisticsWithSourceInfo toPlanStatisticsWithSourceInfo(PlanNodeId id
sourceInfo.isConfident() ? 1 : 0,
new JoinNodeStatistics(
Estimate.estimateFromDouble(joinNodeStatsEstimate.getNullJoinBuildKeyCount()),
Estimate.estimateFromDouble(joinNodeStatsEstimate.getJoinBuildKeyCount())),
Estimate.estimateFromDouble(joinNodeStatsEstimate.getJoinBuildKeyCount()),
Estimate.estimateFromDouble(joinNodeStatsEstimate.getNullJoinProbeKeyCount()),
Estimate.estimateFromDouble(joinNodeStatsEstimate.getJoinProbeKeyCount())),
new TableWriterNodeStatistics(Estimate.estimateFromDouble(tableWriterNodeStatsEstimate.getTaskCountIfScaledWriter()))),
sourceInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public JoinProbe createJoinProbe(Page page)
private final boolean probeMayHaveNull;

private int position = -1;
private int nullRowCount;

private JoinProbe(int[] probeOutputChannels, Page page, Page probePage, @Nullable Block probeHashBlock)
{
Expand All @@ -79,6 +80,7 @@ public boolean advanceNextPosition()
public long getCurrentJoinPosition(LookupSource lookupSource)
{
if (probeMayHaveNull && currentRowContainsNull()) {
++nullRowCount;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Piggyback the count here, without adding overhead in counting.

return -1;
}
if (probeHashBlock != null) {
Expand Down Expand Up @@ -117,4 +119,14 @@ private static boolean probeMayHaveNull(Page probePage)
}
return false;
}

public int getNullRowCount()
{
return nullRowCount;
}

public int getPositionCount()
{
return positionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class LookupJoinOperator
private Optional<ListenableFuture<Supplier<LookupSource>>> unspilledLookupSource = Optional.empty();
private Iterator<Page> unspilledInputPages = emptyIterator();
private final boolean optimizeProbeForEmptyBuild;
private long nullProbeRowCount;
private long inputProbeRowCount;

public LookupJoinOperator(
OperatorContext operatorContext,
Expand Down Expand Up @@ -404,6 +406,8 @@ private void tryUnspillNext()
lookupSourceProvider = null;
}
spiller.ifPresent(PartitioningSpiller::verifyAllPartitionsRead);
operatorContext.recordJoinProbeKeyCount(inputProbeRowCount);
operatorContext.recordNullJoinProbeKeyCount(nullProbeRowCount);
finished = true;
}

Expand Down Expand Up @@ -715,6 +719,10 @@ private void buildPage()
private void clearProbe()
{
// Before updating the probe flush the current page
if (probe != null) {
nullProbeRowCount += probe.getNullRowCount();
inputProbeRowCount += probe.getPositionCount();
}
buildPage();
probe = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public class OperatorContext
private final AtomicLong nullJoinBuildKeyCount = new AtomicLong();
// Number of elements in hash table for join operator
private final AtomicLong joinBuildKeyCount = new AtomicLong();
// Number of NULL probe rows for join operator
private final AtomicLong nullJoinProbeKeyCount = new AtomicLong();
// Number of probe rows for join operator
private final AtomicLong joinProbeKeyCount = new AtomicLong();

private final AtomicLong additionalCpuNanos = new AtomicLong();

Expand Down Expand Up @@ -236,6 +240,16 @@ public void recordJoinBuildKeyCount(long positions)
joinBuildKeyCount.getAndAdd(positions);
}

public void recordNullJoinProbeKeyCount(long positions)
{
nullJoinProbeKeyCount.getAndAdd(positions);
}

public void recordJoinProbeKeyCount(long positions)
{
joinProbeKeyCount.getAndAdd(positions);
}

public void recordPhysicalWrittenData(long sizeInBytes)
{
physicalWrittenDataSize.getAndAdd(sizeInBytes);
Expand Down Expand Up @@ -562,7 +576,9 @@ public OperatorStats getOperatorStats()
info,
runtimeStats,
nullJoinBuildKeyCount.get(),
joinBuildKeyCount.get());
joinBuildKeyCount.get(),
nullJoinProbeKeyCount.get(),
joinProbeKeyCount.get());
}

public <C, R> R accept(QueryContextVisitor<C, R> visitor, C context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class OperatorStats

private final long nullJoinBuildKeyCount;
private final long joinBuildKeyCount;
private final long nullJoinProbeKeyCount;
private final long joinProbeKeyCount;

@JsonCreator
public OperatorStats(
Expand Down Expand Up @@ -151,7 +153,9 @@ public OperatorStats(
@JsonProperty("info") OperatorInfo info,
@JsonProperty("runtimeStats") RuntimeStats runtimeStats,
@JsonProperty("nullJoinBuildKeyCount") long nullJoinBuildKeyCount,
@JsonProperty("joinBuildKeyCount") long joinBuildKeyCount)
@JsonProperty("joinBuildKeyCount") long joinBuildKeyCount,
@JsonProperty("nullJoinProbeKeyCount") long nullJoinProbeKeyCount,
@JsonProperty("joinProbeKeyCount") long joinProbeKeyCount)
{
this.stageId = stageId;
this.stageExecutionId = stageExecutionId;
Expand Down Expand Up @@ -211,6 +215,8 @@ public OperatorStats(
this.infoUnion = null;
this.nullJoinBuildKeyCount = nullJoinBuildKeyCount;
this.joinBuildKeyCount = joinBuildKeyCount;
this.nullJoinProbeKeyCount = nullJoinProbeKeyCount;
this.joinProbeKeyCount = joinProbeKeyCount;
}

@ThriftConstructor
Expand Down Expand Up @@ -266,7 +272,9 @@ public OperatorStats(
@Nullable
OperatorInfoUnion infoUnion,
long nullJoinBuildKeyCount,
long joinBuildKeyCount)
long joinBuildKeyCount,
long nullJoinProbeKeyCount,
long joinProbeKeyCount)
{
this.stageId = stageId;
this.stageExecutionId = stageExecutionId;
Expand Down Expand Up @@ -326,6 +334,8 @@ public OperatorStats(
this.info = null;
this.nullJoinBuildKeyCount = nullJoinBuildKeyCount;
this.joinBuildKeyCount = joinBuildKeyCount;
this.nullJoinProbeKeyCount = nullJoinProbeKeyCount;
this.joinProbeKeyCount = joinProbeKeyCount;
}

@JsonProperty
Expand Down Expand Up @@ -623,6 +633,20 @@ public long getJoinBuildKeyCount()
return joinBuildKeyCount;
}

@JsonProperty
@ThriftField(42)
public long getNullJoinProbeKeyCount()
{
return nullJoinProbeKeyCount;
}

@JsonProperty
@ThriftField(43)
public long getJoinProbeKeyCount()
{
return joinProbeKeyCount;
}

public OperatorStats add(OperatorStats operatorStats)
{
return add(ImmutableList.of(operatorStats));
Expand Down Expand Up @@ -674,6 +698,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)

long nullJoinBuildKeyCount = this.nullJoinBuildKeyCount;
long joinBuildKeyCount = this.joinBuildKeyCount;
long nullJoinProbeKeyCount = this.nullJoinProbeKeyCount;
long joinProbeKeyCount = this.joinProbeKeyCount;

Mergeable<OperatorInfo> base = getMergeableInfoOrNull(info);
for (OperatorStats operator : operators) {
Expand Down Expand Up @@ -731,6 +757,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)

nullJoinBuildKeyCount += operator.getNullJoinBuildKeyCount();
joinBuildKeyCount += operator.getJoinBuildKeyCount();
nullJoinProbeKeyCount += operator.getNullJoinProbeKeyCount();
joinProbeKeyCount += operator.getJoinProbeKeyCount();
}

return new OperatorStats(
Expand Down Expand Up @@ -784,7 +812,9 @@ public OperatorStats add(Iterable<OperatorStats> operators)
(OperatorInfo) base,
runtimeStats,
nullJoinBuildKeyCount,
joinBuildKeyCount);
joinBuildKeyCount,
nullJoinProbeKeyCount,
joinProbeKeyCount);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -850,6 +880,8 @@ public OperatorStats summarize()
info,
runtimeStats,
nullJoinBuildKeyCount,
joinBuildKeyCount);
joinBuildKeyCount,
nullJoinProbeKeyCount,
joinProbeKeyCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ private static OperatorStats convertToThriftOperatorStats(OperatorStats operator
operatorStats.getRuntimeStats(),
convertToOperatorInfoUnion(operatorStats.getInfo()),
operatorStats.getNullJoinBuildKeyCount(),
operatorStats.getJoinBuildKeyCount());
operatorStats.getJoinBuildKeyCount(),
operatorStats.getNullJoinProbeKeyCount(),
operatorStats.getJoinProbeKeyCount());
}

private static MetadataUpdates convertToThriftMetadataUpdates(
Expand Down Expand Up @@ -482,7 +484,9 @@ private static OperatorStats convertFromThriftOperatorStats(OperatorStats thrift
convertToOperatorInfo(thriftOperatorStats.getInfoUnion()),
thriftOperatorStats.getRuntimeStats(),
thriftOperatorStats.getNullJoinBuildKeyCount(),
thriftOperatorStats.getJoinBuildKeyCount());
thriftOperatorStats.getJoinBuildKeyCount(),
thriftOperatorStats.getNullJoinProbeKeyCount(),
thriftOperatorStats.getJoinProbeKeyCount());
}

private static MetadataUpdates convertFromThriftMetadataUpdates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private static class Rewriter
extends SimplePlanRewriter<Set<VariableReferenceExpression>>
{
private static final double NULL_BUILD_KEY_COUNT_THRESHOLD = 100_000;
private static final double NULL_PROBE_KEY_COUNT_THRESHOLD = 100_000;
private static final String LEFT_PREFIX = "l";
private static final String RIGHT_PREFIX = "r";
private final Session session;
Expand Down Expand Up @@ -286,11 +287,7 @@ public PlanNode visitJoin(JoinNode joinNode, RewriteContext<Set<VariableReferenc
PlanNode rewrittenLeft = context.rewrite(joinNode.getLeft(), context.get());
PlanNode rewrittenRight = context.rewrite(joinNode.getRight(), context.get());

JoinNodeStatsEstimate joinEstimate = statsProvider.getStats(joinNode).getJoinNodeStatsEstimate();
boolean isValidEstimate = !Double.isNaN(joinEstimate.getJoinBuildKeyCount()) && !Double.isNaN(joinEstimate.getNullJoinBuildKeyCount());
boolean enabledByCostModel = isValidEstimate && strategy.equals(COST_BASED) && joinEstimate.getNullJoinBuildKeyCount() > NULL_BUILD_KEY_COUNT_THRESHOLD
&& joinEstimate.getNullJoinBuildKeyCount() / joinEstimate.getJoinBuildKeyCount() > getRandomizeOuterJoinNullKeyNullRatioThreshold(session);

boolean enabledByCostModel = strategy.equals(COST_BASED) && hasNullSkew(statsProvider.getStats(joinNode).getJoinNodeStatsEstimate());
List<JoinNode.EquiJoinClause> candidateEquiJoinClauses = joinNode.getCriteria().stream()
.filter(x -> isSupportedType(x.getLeft()) && isSupportedType(x.getRight()))
.filter(x -> enabledByCostModel || strategy.equals(ALWAYS) || enabledForJoinKeyFromOuterJoin(context.get(), x))
Expand Down Expand Up @@ -382,6 +379,16 @@ public PlanNode visitJoin(JoinNode joinNode, RewriteContext<Set<VariableReferenc
return newJoinNode;
}

private boolean hasNullSkew(JoinNodeStatsEstimate joinEstimate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be based on ratio as compared to counts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but counts are better because irrespective of how big the table is you have to shuffle these anyway so in some sense it's not related to ratio per se

{
boolean isValidEstimate = !Double.isNaN(joinEstimate.getJoinBuildKeyCount()) && !Double.isNaN(joinEstimate.getNullJoinBuildKeyCount())
&& !Double.isNaN(joinEstimate.getJoinProbeKeyCount()) && !Double.isNaN(joinEstimate.getNullJoinProbeKeyCount());
return isValidEstimate && ((joinEstimate.getNullJoinBuildKeyCount() > NULL_BUILD_KEY_COUNT_THRESHOLD
&& joinEstimate.getNullJoinBuildKeyCount() / joinEstimate.getJoinBuildKeyCount() > getRandomizeOuterJoinNullKeyNullRatioThreshold(session))
|| (joinEstimate.getNullJoinProbeKeyCount() > NULL_PROBE_KEY_COUNT_THRESHOLD
&& joinEstimate.getNullJoinProbeKeyCount() / joinEstimate.getJoinProbeKeyCount() > getRandomizeOuterJoinNullKeyNullRatioThreshold(session)));
}

private RowExpression randomizeJoinKey(RowExpression keyExpression, String prefix)
{
int partitionCount = getHashPartitionCount(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ public HashCollisionPlanNodeStats(
Map<String, OperatorInputStats> operatorInputStats,
long planNodeNullJoinBuildKeyCount,
long planNodeJoinBuildKeyCount,
long planNodeNullJoinProbeKeyCount,
long planNodeJoinProbeKeyCount,
Map<String, OperatorHashCollisionsStats> operatorHashCollisionsStats)
{
super(planNodeId, planNodeScheduledTime, planNodeCpuTime, planNodeInputPositions, planNodeInputDataSize, planNodeRawInputPositions, planNodeRawInputDataSize,
planNodeOutputPositions, planNodeOutputDataSize, operatorInputStats, planNodeNullJoinBuildKeyCount, planNodeJoinBuildKeyCount);
planNodeOutputPositions, planNodeOutputDataSize, operatorInputStats, planNodeNullJoinBuildKeyCount, planNodeJoinBuildKeyCount, planNodeNullJoinProbeKeyCount, planNodeJoinProbeKeyCount);
this.operatorHashCollisionsStats = requireNonNull(operatorHashCollisionsStats, "operatorHashCollisionsStats is null");
}

Expand Down Expand Up @@ -104,6 +106,8 @@ public PlanNodeStats mergeWith(PlanNodeStats other)
merged.operatorInputStats,
merged.getPlanNodeNullJoinBuildKeyCount(),
merged.getPlanNodeJoinBuildKeyCount(),
merged.getPlanNodeNullJoinProbeKeyCount(),
merged.getPlanNodeJoinProbeKeyCount(),
operatorHashCollisionsStats);
}
}
Loading