Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import static com.facebook.presto.common.type.Decimals.MAX_SHORT_PRECISION;
import static io.airlift.slice.SizeOf.SIZE_OF_LONG;

final class ShortDecimalType
public final class ShortDecimalType
extends DecimalType
{
ShortDecimalType(int precision, int scale)
public ShortDecimalType(int precision, int scale)
{
super(precision, scale, long.class);
validatePrecisionScale(precision, scale, MAX_SHORT_PRECISION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public BuiltInFunctionHandle(@JsonProperty("signature") Signature signature)
checkArgument(signature.getTypeVariableConstraints().isEmpty(), "%s has unbound type parameters", signature);
}

@Override
@JsonProperty
public Signature getSignature()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex;
import static com.facebook.presto.operator.SyntheticAddress.encodeSyntheticAddress;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.airlift.units.DataSize.Unit.BYTE;
Expand Down Expand Up @@ -443,6 +444,12 @@ public PagesHashStrategy createPagesHashStrategy(List<Integer> joinChannels, Opt
groupByUsesEqualTo);
}

public PagesIndexComparator createChannelComparator(int leftChannel, int rightChannel, SortOrder sortOrder)
{
checkArgument(types.get(leftChannel).equals(types.get(rightChannel)), "comparing channels of different types: %s and %s", types.get(leftChannel), types.get(rightChannel));
return new SimpleChannelComparator(leftChannel, rightChannel, types.get(leftChannel), sortOrder);
}

public LookupSourceSupplier createLookupSourceSupplier(
Session session,
List<Integer> joinChannels,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.PrestoException;

import static com.facebook.presto.operator.SyntheticAddress.decodePosition;
import static com.facebook.presto.operator.SyntheticAddress.decodeSliceIndex;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;

public class SimpleChannelComparator
implements PagesIndexComparator
{
private final int leftChannel;
private final int rightChannel;
private final SortOrder sortOrder;
private final Type sortType;

public SimpleChannelComparator(int leftChannel, int rightChannel, Type sortType, SortOrder sortOrder)
{
this.leftChannel = leftChannel;
this.rightChannel = rightChannel;
this.sortOrder = requireNonNull(sortOrder, "sortOrder is null");
this.sortType = requireNonNull(sortType, "sortType is null.");
}

@Override
public int compareTo(PagesIndex pagesIndex, int leftPosition, int rightPosition)
{
long leftPageAddress = pagesIndex.getValueAddresses().get(leftPosition);
int leftBlockIndex = decodeSliceIndex(leftPageAddress);
int leftBlockPosition = decodePosition(leftPageAddress);

long rightPageAddress = pagesIndex.getValueAddresses().get(rightPosition);
int rightBlockIndex = decodeSliceIndex(rightPageAddress);
int rightBlockPosition = decodePosition(rightPageAddress);

try {
Block leftBlock = pagesIndex.getChannel(leftChannel).get(leftBlockIndex);
Block rightBlock = pagesIndex.getChannel(rightChannel).get(rightBlockIndex);
int result = sortOrder.compareBlockValue(sortType, leftBlock, leftBlockPosition, rightBlock, rightBlockPosition);

// sortOrder compares block values and adjusts the result by ASC and DESC. SimpleChannelComparator should
// return the simple comparison, so reverse it if it is DESC.
return sortOrder.isAscending() ? result : -result;
}
catch (Throwable throwable) {
throwIfUnchecked(throwable);
throw new PrestoException(GENERIC_INTERNAL_ERROR, throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.operator.WorkProcessor.ProcessState;
import com.facebook.presto.operator.WorkProcessor.Transformation;
import com.facebook.presto.operator.WorkProcessor.TransformationState;
import com.facebook.presto.operator.window.FrameInfo;
import com.facebook.presto.operator.window.FramedWindowFunction;
import com.facebook.presto.operator.window.WindowPartition;
import com.facebook.presto.spi.plan.PlanNodeId;
Expand All @@ -29,6 +30,7 @@
import com.facebook.presto.sql.gen.OrderingCompiler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.PeekingIterator;
Expand All @@ -38,6 +40,8 @@
import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -47,6 +51,9 @@
import static com.facebook.presto.common.block.SortOrder.ASC_NULLS_LAST;
import static com.facebook.presto.operator.SpillingUtils.checkSpillSucceeded;
import static com.facebook.presto.operator.WorkProcessor.TransformationState.needsMoreData;
import static com.facebook.presto.sql.planner.plan.WindowNode.Frame.BoundType.FOLLOWING;
import static com.facebook.presto.sql.planner.plan.WindowNode.Frame.BoundType.PRECEDING;
import static com.facebook.presto.sql.planner.plan.WindowNode.Frame.WindowType.RANGE;
import static com.facebook.presto.util.MergeSortedPages.mergeSortedPages;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkPositionIndex;
Expand Down Expand Up @@ -270,7 +277,9 @@ public WindowOperator(
preGroupedChannels,
unGroupedPartitionChannels,
preSortedChannels,
sortChannels);
sortChannels,
sortOrder,
windowFunctionDefinitions);

if (spillEnabled) {
PagesIndexWithHashStrategies mergedPagesIndexWithHashStrategies = new PagesIndexWithHashStrategies(
Expand All @@ -282,7 +291,9 @@ public WindowOperator(
ImmutableList.of(),
// merged pages are pre sorted on all sort channels
sortChannels,
sortChannels);
sortChannels,
sortOrder,
windowFunctionDefinitions);

this.spillablePagesToPagesIndexes = Optional.of(new SpillablePagesToPagesIndexes(
inMemoryPagesIndexWithHashStrategies,
Expand Down Expand Up @@ -381,6 +392,7 @@ private static class PagesIndexWithHashStrategies
final PagesHashStrategy preSortedPartitionHashStrategy;
final PagesHashStrategy peerGroupHashStrategy;
final int[] preGroupedPartitionChannels;
final Map<FrameBoundKey, PagesIndexComparator> frameBoundComparators;

PagesIndexWithHashStrategies(
PagesIndex.Factory pagesIndexFactory,
Expand All @@ -389,14 +401,87 @@ private static class PagesIndexWithHashStrategies
List<Integer> preGroupedPartitionChannels,
List<Integer> unGroupedPartitionChannels,
List<Integer> preSortedChannels,
List<Integer> sortChannels)
List<Integer> sortChannels,
List<SortOrder> sortOrder,
List<WindowFunctionDefinition> windowFunctionDefinitions)
{
this.pagesIndex = pagesIndexFactory.newPagesIndex(sourceTypes, expectedPositions);
this.preGroupedPartitionHashStrategy = pagesIndex.createPagesHashStrategy(preGroupedPartitionChannels, OptionalInt.empty());
this.unGroupedPartitionHashStrategy = pagesIndex.createPagesHashStrategy(unGroupedPartitionChannels, OptionalInt.empty());
this.preSortedPartitionHashStrategy = pagesIndex.createPagesHashStrategy(preSortedChannels, OptionalInt.empty());
this.peerGroupHashStrategy = pagesIndex.createPagesHashStrategy(sortChannels, OptionalInt.empty());
this.preGroupedPartitionChannels = Ints.toArray(preGroupedPartitionChannels);
this.frameBoundComparators = createFrameBoundComparators(pagesIndex, windowFunctionDefinitions, sortOrder);
}
}

/**
* Create comparators necessary for seeking frame start or frame end for window functions with frame type RANGE.
* Whenever a frame bound is specified as RANGE X PRECEDING or RANGE X FOLLOWING,
* a dedicated comparator is created to compare sort key values with expected frame bound values.
*/
private static Map<FrameBoundKey, PagesIndexComparator> createFrameBoundComparators(PagesIndex pagesIndex,
List<WindowFunctionDefinition> windowFunctionDefinitions,
List<SortOrder> sortOrders)
{
ImmutableMap.Builder<FrameBoundKey, PagesIndexComparator> builder = ImmutableMap.builder();

for (int i = 0; i < windowFunctionDefinitions.size(); i++) {
FrameInfo frameInfo = windowFunctionDefinitions.get(i).getFrameInfo();
if (frameInfo.getType() == RANGE) {
if (frameInfo.getStartType() == PRECEDING || frameInfo.getStartType() == FOLLOWING) {
// Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY
SortOrder sortOrder = sortOrders.get(0);
PagesIndexComparator comparator = pagesIndex.createChannelComparator(frameInfo.getSortKeyChannelForStartComparison(), frameInfo.getStartChannel(), sortOrder);
builder.put(new FrameBoundKey(i, FrameBoundKey.Type.START), comparator);
}
if (frameInfo.getEndType() == PRECEDING || frameInfo.getEndType() == FOLLOWING) {
// Window frame of type RANGE PRECEDING or FOLLOWING requires single sort item in ORDER BY
SortOrder sortOrder = sortOrders.get(0);
PagesIndexComparator comparator = pagesIndex.createChannelComparator(frameInfo.getSortKeyChannelForEndComparison(), frameInfo.getEndChannel(), sortOrder);
builder.put(new FrameBoundKey(i, FrameBoundKey.Type.END), comparator);
}
}
}

return builder.build();
}

public static class FrameBoundKey
{
private final int functionIndex;
private final Type type;

public enum Type
{
START,
END;
}

public FrameBoundKey(int functionIndex, Type type)
{
this.functionIndex = functionIndex;
this.type = requireNonNull(type, "type is null");
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FrameBoundKey that = (FrameBoundKey) o;
return functionIndex == that.functionIndex &&
type == that.type;
}

@Override
public int hashCode()
{
return Objects.hash(functionIndex, type);
}
}

Expand Down Expand Up @@ -501,7 +586,14 @@ public ProcessState<WindowPartition> process()

int partitionEnd = findGroupEnd(pagesIndex, pagesIndexWithHashStrategies.unGroupedPartitionHashStrategy, partitionStart);

WindowPartition partition = new WindowPartition(pagesIndex, partitionStart, partitionEnd, outputChannels, windowFunctions, pagesIndexWithHashStrategies.peerGroupHashStrategy);
WindowPartition partition = new WindowPartition(
pagesIndex,
partitionStart,
partitionEnd,
outputChannels,
windowFunctions,
pagesIndexWithHashStrategies.peerGroupHashStrategy,
pagesIndexWithHashStrategies.frameBoundComparators);
windowInfo.addPartition(partition);
partitionStart = partitionEnd;
return ProcessState.ofResult(partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.sql.planner.plan.WindowNode.Frame.BoundType;
import com.facebook.presto.sql.planner.plan.WindowNode.Frame.WindowType;
import com.facebook.presto.sql.tree.SortItem.Ordering;

import java.util.Objects;
import java.util.Optional;
Expand All @@ -27,21 +28,33 @@ public class FrameInfo
private final WindowType type;
private final BoundType startType;
private final int startChannel;
private final int sortKeyChannelForStartComparison;
private final BoundType endType;
private final int endChannel;
private final int sortKeyChannelForEndComparison;
private final int sortKeyChannel;
private final Optional<Ordering> ordering;

public FrameInfo(
WindowType type,
BoundType startType,
Optional<Integer> startChannel,
Optional<Integer> sortKeyChannelForStartComparison,
BoundType endType,
Optional<Integer> endChannel)
Optional<Integer> endChannel,
Optional<Integer> sortKeyChannelForEndComparison,
Optional<Integer> sortKeyChannel,
Optional<Ordering> ordering)
{
this.type = requireNonNull(type, "type is null");
this.startType = requireNonNull(startType, "startType is null");
this.startChannel = requireNonNull(startChannel, "startChannel is null").orElse(-1);
this.sortKeyChannelForStartComparison = requireNonNull(sortKeyChannelForStartComparison, "sortKeyChannelForStartComparison is null").orElse(-1);
this.endType = requireNonNull(endType, "endType is null");
this.endChannel = requireNonNull(endChannel, "endChannel is null").orElse(-1);
this.sortKeyChannelForEndComparison = requireNonNull(sortKeyChannelForEndComparison, "sortKeyChannelForEndComparison is null").orElse(-1);
this.sortKeyChannel = requireNonNull(sortKeyChannel, "sortKeyChannel is null").orElse(-1);
this.ordering = requireNonNull(ordering, "ordering is null");
}

public WindowType getType()
Expand All @@ -59,6 +72,11 @@ public int getStartChannel()
return startChannel;
}

public int getSortKeyChannelForStartComparison()
{
return sortKeyChannelForStartComparison;
}

public BoundType getEndType()
{
return endType;
Expand All @@ -69,10 +87,25 @@ public int getEndChannel()
return endChannel;
}

public int getSortKeyChannelForEndComparison()
{
return sortKeyChannelForEndComparison;
}

public int getSortKeyChannel()
{
return sortKeyChannel;
}

public Optional<Ordering> getOrdering()
{
return ordering;
}

@Override
public int hashCode()
{
return Objects.hash(type, startType, startChannel, endType, endChannel);
return Objects.hash(type, startType, startChannel, sortKeyChannelForStartComparison, endType, endChannel, sortKeyChannelForEndComparison, sortKeyChannel, ordering);
}

@Override
Expand All @@ -90,9 +123,13 @@ public boolean equals(Object obj)

return Objects.equals(this.type, other.type) &&
Objects.equals(this.startType, other.startType) &&
Objects.equals(this.sortKeyChannelForStartComparison, other.sortKeyChannelForStartComparison) &&
Objects.equals(this.startChannel, other.startChannel) &&
Objects.equals(this.endType, other.endType) &&
Objects.equals(this.endChannel, other.endChannel);
Objects.equals(this.endChannel, other.endChannel) &&
Objects.equals(this.sortKeyChannelForEndComparison, other.sortKeyChannelForEndComparison) &&
Objects.equals(this.sortKeyChannel, other.sortKeyChannel) &&
Objects.equals(this.ordering, other.ordering);
}

@Override
Expand All @@ -102,8 +139,12 @@ public String toString()
.add("type", type)
.add("startType", startType)
.add("startChannel", startChannel)
.add("sortKeyChannelForStartComparison", sortKeyChannelForStartComparison)
.add("endType", endType)
.add("endChannel", endChannel)
.add("sortKeyChannelForEndComparison", sortKeyChannelForEndComparison)
.add("sortKeyChannel", sortKeyChannel)
.add("ordering", ordering)
.toString();
}
}
Loading