Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ public class Analysis

private final Map<NodeRef<Expression>, Type> types = new LinkedHashMap<>();
private final Map<NodeRef<Expression>, Type> coercions = new LinkedHashMap<>();
// Coercions needed for window function frame of type RANGE.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// These are coercions for the sort key, needed for frame bound calculation, identified by frame range offset expression.
// Frame definition might contain two different offset expressions (for start and end), each requiring different coercion of the sort key.
private final Map<NodeRef<Expression>, Type> sortKeyCoercionsForFrameBoundCalculation = new LinkedHashMap<>();
// Coercions needed for window function frame of type RANGE.
// These are coercions for the sort key, needed for comparison of the sort key with precomputed frame bound, identified by frame range offset expression.
private final Map<NodeRef<Expression>, Type> sortKeyCoercionsForFrameBoundComparison = new LinkedHashMap<>();
// Functions for calculating frame bounds for frame of type RANGE, identified by frame range offset expression.
private final Map<NodeRef<Expression>, FunctionHandle> frameBoundCalculations = new LinkedHashMap<>();
private final Set<NodeRef<Expression>> typeOnlyCoercions = new LinkedHashSet<>();
private final Map<NodeRef<Relation>, List<Type>> relationCoercions = new LinkedHashMap<>();
private final Map<NodeRef<FunctionCall>, FunctionHandle> functionHandles = new LinkedHashMap<>();
Expand Down Expand Up @@ -553,10 +562,36 @@ public void addCoercion(Expression expression, Type type, boolean isTypeOnlyCoer
}
}

public void addCoercions(Map<NodeRef<Expression>, Type> coercions, Set<NodeRef<Expression>> typeOnlyCoercions)
public void addCoercions(
Map<NodeRef<Expression>, Type> coercions,
Set<NodeRef<Expression>> typeOnlyCoercions,
Map<NodeRef<Expression>, Type> sortKeyCoercionsForFrameBoundCalculation,
Map<NodeRef<Expression>, Type> sortKeyCoercionsForFrameBoundComparison)
{
this.coercions.putAll(coercions);
this.typeOnlyCoercions.addAll(typeOnlyCoercions);
this.sortKeyCoercionsForFrameBoundCalculation.putAll(sortKeyCoercionsForFrameBoundCalculation);
this.sortKeyCoercionsForFrameBoundComparison.putAll(sortKeyCoercionsForFrameBoundComparison);
}

public Type getSortKeyCoercionForFrameBoundCalculation(Expression frameOffset)
{
return sortKeyCoercionsForFrameBoundCalculation.get(NodeRef.of(frameOffset));
}

public Type getSortKeyCoercionForFrameBoundComparison(Expression frameOffset)
{
return sortKeyCoercionsForFrameBoundComparison.get(NodeRef.of(frameOffset));
}

public void addFrameBoundCalculations(Map<NodeRef<Expression>, FunctionHandle> frameBoundCalculations)
{
this.frameBoundCalculations.putAll(frameBoundCalculations);
}

public FunctionHandle getFrameBoundCalculation(Expression frameOffset)
{
return frameBoundCalculations.get(NodeRef.of(frameOffset));
}

public Expression getHaving(QuerySpecification query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public enum SemanticErrorCode
MISSING_ATTRIBUTE,
INVALID_ORDINAL,
INVALID_LITERAL,
MISSING_ORDER_BY,
INVALID_ORDER_BY,
Comment on lines +53 to +54
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corresponding to changes here https://github.com/trinodb/trino/pull/5639/files#diff-01bc66a3163352eb4ad49f6100ca0652ca7f53e6a32d9795deed8e9f6f187b02

INVALID_ORDER_BY is added in the ported PR, MISSING_ORDER_BY was added before this ported PR and not available in our codebase, hence add here.


FUNCTION_NOT_FOUND,
INVALID_FUNCTION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import static com.facebook.presto.common.type.Decimals.MAX_SHORT_PRECISION;
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;

final class ShortDecimalType
public final class ShortDecimalType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not from ported PR.
Need this change, as it's accessed in test TestWindowFrameRange

extends DecimalType
{
ShortDecimalType(int precision, int scale)
public ShortDecimalType(int precision, int scale)
{
super(precision, scale, long.class);
validatePrecisionScale(precision, scale, MAX_SHORT_PRECISION);
Expand Down
13 changes: 9 additions & 4 deletions presto-docs/src/main/sphinx/functions/window.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ A ``frame`` is one of::
``frame_start`` and ``frame_end`` can be any of::

UNBOUNDED PRECEDING
expression PRECEDING -- only allowed in ROWS mode
expression PRECEDING
CURRENT ROW
expression FOLLOWING -- only allowed in ROWS mode
expression FOLLOWING
UNBOUNDED FOLLOWING


Expand All @@ -49,10 +49,15 @@ The window definition has 3 components:
the first peer row of the current row, while a frame end of ``CURRENT ROW`` refers to
the last peer row of the current row.

Frame starts and ends of ``expression PRECEDING`` or ``expression FOLLOWING`` are currently
only allowed in ``ROWS`` mode. They define the start or end of the frame as the specified number
In ``ROWS`` mode, frame starts and ends of ``expression PRECEDING`` or ``expression FOLLOWING``
define the start or end of the frame as the specified number
of rows before or after the current row. The ``expression`` must be of type ``INTEGER``.

In ``RANGE`` mode, frame starts and ends of ``expression PRECEDING`` or ``expression FOLLOWING``
define the start or end of the frame as the value difference of the sort key from
the current row. The sort key must either be the same type of ``expression`` or can be coerced to the
same type as ``expression``.

If no frame is specified, a default frame of ``RANGE UNBOUNDED PRECEDING`` is used.

Examples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex;
import static com.facebook.presto.operator.SyntheticAddress.encodeSyntheticAddress;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand Down Expand Up @@ -443,6 +444,12 @@ public PagesHashStrategy createPagesHashStrategy(List<Integer> joinChannels, Opt
groupByUsesEqualTo);
}

public PagesIndexComparator createChannelComparator(int leftChannel, int rightChannel, SortOrder sortOrder)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
checkArgument(types.get(leftChannel).equals(types.get(rightChannel)), "comparing channels of different types: %s and %s", types.get(leftChannel), types.get(rightChannel));
return new SimpleChannelComparator(leftChannel, rightChannel, types.get(leftChannel), sortOrder);
}

public LookupSourceSupplier createLookupSourceSupplier(
Session session,
List<Integer> joinChannels,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.PrestoException;

import static com.facebook.presto.operator.SyntheticAddress.decodePosition;
import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;

public class SimpleChannelComparator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ported from https://github.com/trinodb/trino/pull/5639/files#diff-5e0c59beecd68b5003a8b1e38f114aeeaf754d7d92f145a8c95ede66076cc4fb.

The difference is that, in the ported PR, it's using BlockPositionComparison which compares two block positions in ascending order, which is not available in our codebase. Here use the compareBlockValue function of sort order.

implements PagesIndexComparator
{
private final int leftChannel;
private final int rightChannel;
private final SortOrder sortOrder;
private final Type sortType;

public SimpleChannelComparator(int leftChannel, int rightChannel, Type sortType, SortOrder sortOrder)
{
this.leftChannel = leftChannel;
this.rightChannel = rightChannel;
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
this.sortType = requireNonNull(sortType, "sortType is null.");
}

@Override
public int compareTo(PagesIndex pagesIndex, int leftPosition, int rightPosition)
{
long leftPageAddress = pagesIndex.getValueAddresses().get(leftPosition);
int leftBlockIndex = decodeSliceIndex(leftPageAddress);
int leftBlockPosition = decodePosition(leftPageAddress);

long rightPageAddress = pagesIndex.getValueAddresses().get(rightPosition);
int rightBlockIndex = decodeSliceIndex(rightPageAddress);
int rightBlockPosition = decodePosition(rightPageAddress);

try {
Block leftBlock = pagesIndex.getChannel(leftChannel).get(leftBlockIndex);
Block rightBlock = pagesIndex.getChannel(rightChannel).get(rightBlockIndex);
int result = sortOrder.compareBlockValue(sortType, leftBlock, leftBlockPosition, rightBlock, rightBlockPosition);

// sortOrder compares block values and adjusts the result by ASC and DESC. SimpleChannelComparator should
// return the simple comparison, so reverse it if it is DESC.
return sortOrder.isAscending() ? result : -result;
}
catch (Throwable throwable) {
throwIfUnchecked(throwable);
throw new PrestoException(GENERIC_INTERNAL_ERROR, throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.operator.WorkProcessor.ProcessState;
import com.facebook.presto.operator.WorkProcessor.Transformation;
import com.facebook.presto.operator.WorkProcessor.TransformationState;
import com.facebook.presto.operator.window.FrameInfo;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import com.facebook.presto.operator.window.FramedWindowFunction;
import com.facebook.presto.operator.window.WindowPartition;
import com.facebook.presto.spi.plan.PlanNodeId;
Expand All @@ -29,6 +30,7 @@
import com.facebook.presto.sql.gen.OrderingCompiler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.PeekingIterator;
Expand All @@ -38,6 +40,8 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -47,6 +51,9 @@
import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_LAST;
import static com.facebook.presto.operator.SpillingUtils.checkSpillSucceeded;
import static com.facebook.presto.operator.WorkProcessor.TransformationState.needsMoreData;
import static com.facebook.presto.sql.planner.plan.WindowNode.Frame.BoundType.FOLLOWING;
import static com.facebook.presto.sql.planner.plan.WindowNode.Frame.BoundType.PRECEDING;
import static com.facebook.presto.sql.planner.plan.WindowNode.Frame.WindowType.RANGE;
import static com.facebook.presto.util.MergeSortedPages.mergeSortedPages;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndex;
Expand Down Expand Up @@ -270,7 +277,9 @@ public WindowOperator(
preGroupedChannels,
unGroupedPartitionChannels,
preSortedChannels,
sortChannels);
sortChannels,
sortOrder,
windowFunctionDefinitions);

if (spillEnabled) {
PagesIndexWithHashStrategies mergedPagesIndexWithHashStrategies = new PagesIndexWithHashStrategies(
Expand All @@ -282,7 +291,9 @@ public WindowOperator(
ImmutableList.of(),
// merged pages are pre sorted on all sort channels
sortChannels,
sortChannels);
sortChannels,
sortOrder,
windowFunctionDefinitions);

this.spillablePagesToPagesIndexes = Optional.of(new SpillablePagesToPagesIndexes(
inMemoryPagesIndexWithHashStrategies,
Expand Down Expand Up @@ -381,6 +392,7 @@ private static class PagesIndexWithHashStrategies
final PagesHashStrategy preSortedPartitionHashStrategy;
final PagesHashStrategy peerGroupHashStrategy;
final int[] preGroupedPartitionChannels;
final Map<FrameBoundKey, PagesIndexComparator> frameBoundComparators;

PagesIndexWithHashStrategies(
PagesIndex.Factory pagesIndexFactory,
Expand All @@ -389,14 +401,89 @@ private static class PagesIndexWithHashStrategies
List<Integer> preGroupedPartitionChannels,
List<Integer> unGroupedPartitionChannels,
List<Integer> preSortedChannels,
List<Integer> sortChannels)
List<Integer> sortChannels,
List<SortOrder> sortOrder,
List<WindowFunctionDefinition> windowFunctionDefinitions)
{
this.pagesIndex = pagesIndexFactory.newPagesIndex(sourceTypes, expectedPositions);
this.preGroupedPartitionHashStrategy = pagesIndex.createPagesHashStrategy(preGroupedPartitionChannels, OptionalInt.empty());
this.unGroupedPartitionHashStrategy = pagesIndex.createPagesHashStrategy(unGroupedPartitionChannels, OptionalInt.empty());
this.preSortedPartitionHashStrategy = pagesIndex.createPagesHashStrategy(preSortedChannels, OptionalInt.empty());
this.peerGroupHashStrategy = pagesIndex.createPagesHashStrategy(sortChannels, OptionalInt.empty());
this.preGroupedPartitionChannels = Ints.toArray(preGroupedPartitionChannels);
this.frameBoundComparators = createFrameBoundComparators(pagesIndex, windowFunctionDefinitions, sortOrder);
}
}

/**
* Create comparators necessary for seeking frame start or frame end for window functions with frame type RANGE.
* Whenever a frame bound is specified as RANGE X PRECEDING or RANGE X FOLLOWING,
* a dedicated comparator is created to compare sort key values with expected frame bound values.
*/
private static Map<FrameBoundKey, PagesIndexComparator> createFrameBoundComparators(PagesIndex pagesIndex,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ported function. Only difference is that we also have sortOrders passed in, because of the different implementation of SimpleChannelComparator.

List<WindowFunctionDefinition> windowFunctionDefinitions,
List<SortOrder> sortOrders)
{
ImmutableMap.Builder<FrameBoundKey, PagesIndexComparator> builder = ImmutableMap.builder();

for (int i = 0; i < windowFunctionDefinitions.size(); i++) {
FrameInfo frameInfo = windowFunctionDefinitions.get(i).getFrameInfo();
if (frameInfo.getType() == RANGE) {
if (frameInfo.getStartType() == PRECEDING || frameInfo.getStartType() == FOLLOWING) {
// Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY
checkState(sortOrders != null && sortOrders.size() == 1, "Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY.");
SortOrder sortOrder = sortOrders.get(0);
PagesIndexComparator comparator = pagesIndex.createChannelComparator(frameInfo.getSortKeyChannelForStartComparison(), frameInfo.getStartChannel(), sortOrder);
builder.put(new FrameBoundKey(i, FrameBoundKey.Type.START), comparator);
}
if (frameInfo.getEndType() == PRECEDING || frameInfo.getEndType() == FOLLOWING) {
// Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY
checkState(sortOrders != null && sortOrders.size() == 1, "Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY.");
SortOrder sortOrder = sortOrders.get(0);
PagesIndexComparator comparator = pagesIndex.createChannelComparator(frameInfo.getSortKeyChannelForEndComparison(), frameInfo.getEndChannel(), sortOrder);
builder.put(new FrameBoundKey(i, FrameBoundKey.Type.END), comparator);
}
}
}

return builder.build();
}

public static class FrameBoundKey
{
private final int functionIndex;
private final Type type;

public enum Type
{
START,
END;
}

public FrameBoundKey(int functionIndex, Type type)
{
this.functionIndex = functionIndex;
this.type = requireNonNull(type, "type is null");
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FrameBoundKey that = (FrameBoundKey) o;
return functionIndex == that.functionIndex &&
type == that.type;
}

@Override
public int hashCode()
{
return Objects.hash(functionIndex, type);
}
}

Expand Down Expand Up @@ -501,7 +588,14 @@ public ProcessState<WindowPartition> process()

int partitionEnd = findGroupEnd(pagesIndex, pagesIndexWithHashStrategies.unGroupedPartitionHashStrategy, partitionStart);

WindowPartition partition = new WindowPartition(pagesIndex, partitionStart, partitionEnd, outputChannels, windowFunctions, pagesIndexWithHashStrategies.peerGroupHashStrategy);
WindowPartition partition = new WindowPartition(
pagesIndex,
partitionStart,
partitionEnd,
outputChannels,
windowFunctions,
pagesIndexWithHashStrategies.peerGroupHashStrategy,
pagesIndexWithHashStrategies.frameBoundComparators);
windowInfo.addPartition(partition);
partitionStart = partitionEnd;
return ProcessState.ofResult(partition);
Expand Down
Loading