Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;

import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static com.google.common.base.Defaults.defaultValue;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.prestosql.spi.StandardErrorCode.EXCEEDED_FUNCTION_MEMORY_LIMIT;
import static io.prestosql.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.prestosql.spi.type.TypeUtils.readNativeValue;
import static io.prestosql.type.TypeUtils.hashPosition;
import static io.prestosql.type.TypeUtils.positionEqualsPosition;
import static io.prestosql.util.Failures.internalError;
import static it.unimi.dsi.fastutil.HashCommon.arraySize;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -43,6 +49,7 @@ public class TypedSet
static final long FOUR_MEGABYTES = MAX_FUNCTION_MEMORY.toBytes();

private final Type elementType;
private final Optional<MethodHandle> elementIsDistinctFrom;
private final IntArrayList blockPositionByHash;
private final BlockBuilder elementBlock;
private final String functionName;
Expand All @@ -61,13 +68,20 @@ public class TypedSet

public TypedSet(Type elementType, int expectedSize, String functionName)
{
this(elementType, elementType.createBlockBuilder(null, expectedSize), expectedSize, functionName);
// TODO revise other usages of TypedSet and determine whether they should use equality or distinctness semantics
this(elementType, Optional.empty(), elementType.createBlockBuilder(null, expectedSize), expectedSize, functionName);
}

public TypedSet(Type elementType, BlockBuilder blockBuilder, int expectedSize, String functionName)
public TypedSet(Type elementType, MethodHandle elementIsDistinctFrom, int expectedSize, String functionName)
{
this(elementType, Optional.of(elementIsDistinctFrom), elementType.createBlockBuilder(null, expectedSize), expectedSize, functionName);
}

public TypedSet(Type elementType, Optional<MethodHandle> elementIsDistinctFrom, BlockBuilder blockBuilder, int expectedSize, String functionName)
{
checkArgument(expectedSize >= 0, "expectedSize must not be negative");
this.elementType = requireNonNull(elementType, "elementType must not be null");
this.elementIsDistinctFrom = requireNonNull(elementIsDistinctFrom, "elementIsDistinctFrom is null");
this.elementBlock = requireNonNull(blockBuilder, "blockBuilder must not be null");
this.functionName = functionName;

Expand Down Expand Up @@ -140,19 +154,36 @@ private int getHashPositionOfElement(Block block, int position)
int hashPosition = getMaskedHash(hashPosition(elementType, block, position));
while (true) {
int blockPosition = blockPositionByHash.get(hashPosition);
// Doesn't have this element
if (blockPosition == EMPTY_SLOT) {
// Doesn't have this element
return hashPosition;
}
// Already has this element
if (positionEqualsPosition(elementType, elementBlock, blockPosition, block, position)) {
if (isContainedAt(block, position, blockPosition)) {
// Already has this element
return hashPosition;
}

hashPosition = getMaskedHash(hashPosition + 1);
}
}

private boolean isContainedAt(Block block, int position, int atPosition)
{
if (elementIsDistinctFrom.isPresent()) {
boolean firstValueNull = elementBlock.isNull(atPosition);
Object firstValue = firstValueNull ? defaultValue(elementType.getJavaType()) : readNativeValue(elementType, elementBlock, atPosition);
boolean secondValueNull = block.isNull(position);
Object secondValue = secondValueNull ? defaultValue(elementType.getJavaType()) : readNativeValue(elementType, block, position);
try {
return !(boolean) elementIsDistinctFrom.get().invoke(firstValue, firstValueNull, secondValue, secondValueNull);
}
catch (Throwable t) {
throw internalError(t);
}
}
return positionEqualsPosition(elementType, elementBlock, atPosition, block, position);
}

private void addNewElement(int hashPosition, Block block, int position)
{
elementType.appendTo(block, position, elementBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.function.Description;
import io.prestosql.spi.function.OperatorDependency;
import io.prestosql.spi.function.ScalarFunction;
import io.prestosql.spi.function.SqlType;
import io.prestosql.spi.function.TypeParameter;
import io.prestosql.spi.type.Type;
import io.prestosql.type.TypeUtils;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;

import java.lang.invoke.MethodHandle;

import static com.google.common.base.Defaults.defaultValue;
import static io.prestosql.spi.function.OperatorType.IS_DISTINCT_FROM;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.StandardTypes.BOOLEAN;
import static io.prestosql.spi.type.TypeUtils.readNativeValue;
import static io.prestosql.util.Failures.internalError;

@ScalarFunction("array_distinct")
@Description("Remove duplicate values from the given array")
Expand All @@ -43,22 +50,34 @@ public ArrayDistinctFunction(@TypeParameter("E") Type elementType)

@TypeParameter("E")
@SqlType("array(E)")
public Block distinct(@TypeParameter("E") Type type, @SqlType("array(E)") Block array)
public Block distinct(
@TypeParameter("E") Type type,
@OperatorDependency(operator = IS_DISTINCT_FROM, returnType = BOOLEAN, argumentTypes = {"E", "E"}) MethodHandle elementIsDistinctFrom,
@SqlType("array(E)") Block array)
{
if (array.getPositionCount() < 2) {
return array;
}

if (array.getPositionCount() == 2) {
if (TypeUtils.positionEqualsPosition(type, array, 0, array, 1)) {
return array.getSingleValueBlock(0);
boolean firstValueNull = array.isNull(0);
Object firstValue = firstValueNull ? defaultValue(type.getJavaType()) : readNativeValue(type, array, 0);
boolean secondValueNull = array.isNull(1);
Object secondValue = secondValueNull ? defaultValue(type.getJavaType()) : readNativeValue(type, array, 1);
boolean distinct;
try {
distinct = (boolean) elementIsDistinctFrom.invoke(firstValue, firstValueNull, secondValue, secondValueNull);
}
catch (Throwable t) {
throw internalError(t);
}
else {
if (distinct) {
return array;
}
return array.getSingleValueBlock(0);
}

TypedSet typedSet = new TypedSet(type, array.getPositionCount(), "array_distinct");
TypedSet typedSet = new TypedSet(type, elementIsDistinctFrom, array.getPositionCount(), "array_distinct");
int distinctCount = 0;

if (pageBuilder.isFull()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.function.Description;
import io.prestosql.spi.function.OperatorDependency;
import io.prestosql.spi.function.ScalarFunction;
import io.prestosql.spi.function.SqlType;
import io.prestosql.spi.function.TypeParameter;
import io.prestosql.spi.type.Type;

import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static io.prestosql.spi.function.OperatorType.IS_DISTINCT_FROM;
import static io.prestosql.spi.type.StandardTypes.BOOLEAN;

@ScalarFunction("array_intersect")
@Description("Intersects elements of the two given arrays")
public final class ArrayIntersectFunction
Expand All @@ -40,6 +47,7 @@ public ArrayIntersectFunction(@TypeParameter("E") Type elementType)
@SqlType("array(E)")
public Block intersect(
@TypeParameter("E") Type type,
@OperatorDependency(operator = IS_DISTINCT_FROM, returnType = BOOLEAN, argumentTypes = {"E", "E"}) MethodHandle elementIsDistinctFrom,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding methodHandle dependencies in the methods that get called row per row is a bit of an anti-pattern. It typically means the code is turning around and doing methodHandle.invoke at some point, which become polymorphic when the function is used in a variety of contexts. A better approach is to get the dependencies during the "get implementation" call, which should assemble a methodhandle chain or generate specialized bytecode.

It doesn't matter for this specific instance since it's hard to assemble the methodhandle chain with the TypedSet in the middle, but I'm just pointing it out as a general comment (so no need to change anything here).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left as is

@SqlType("array(E)") Block leftArray,
@SqlType("array(E)") Block rightArray)
{
Expand All @@ -60,15 +68,15 @@ public Block intersect(
pageBuilder.reset();
}

TypedSet rightTypedSet = new TypedSet(type, rightPositionCount, "array_intersect");
TypedSet rightTypedSet = new TypedSet(type, elementIsDistinctFrom, rightPositionCount, "array_intersect");
for (int i = 0; i < rightPositionCount; i++) {
rightTypedSet.add(rightArray, i);
}

BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(0);

// The intersected set can have at most rightPositionCount elements
TypedSet intersectTypedSet = new TypedSet(type, blockBuilder, rightPositionCount, "array_intersect");
TypedSet intersectTypedSet = new TypedSet(type, Optional.of(elementIsDistinctFrom), blockBuilder, rightPositionCount, "array_intersect");
for (int i = 0; i < leftPositionCount; i++) {
if (rightTypedSet.contains(leftArray, i)) {
intersectTypedSet.add(leftArray, i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static io.airlift.slice.Slices.utf8Slice;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testGetElementPositionWithProvidedEmptyBlockBuilder()
int initialTypedSetEntryCount = 10;

BlockBuilder emptyBlockBuilder = BIGINT.createFixedSizeBlockBuilder(elementCount);
TypedSet typedSet = new TypedSet(BIGINT, emptyBlockBuilder, initialTypedSetEntryCount, FUNCTION_NAME);
TypedSet typedSet = new TypedSet(BIGINT, Optional.empty(), emptyBlockBuilder, initialTypedSetEntryCount, FUNCTION_NAME);
BlockBuilder externalBlockBuilder = BIGINT.createFixedSizeBlockBuilder(elementCount);
for (int i = 0; i < elementCount; i++) {
if (i % 10 == 0) {
Expand Down Expand Up @@ -167,7 +168,7 @@ public void testGetElementPositionWithProvidedNonEmptyBlockBuilder()
// The secondBlockBuilder should already have elementCount rows.
BlockBuilder secondBlockBuilder = pageBuilder.getBlockBuilder(0);

TypedSet typedSet = new TypedSet(BIGINT, secondBlockBuilder, initialTypedSetEntryCount, FUNCTION_NAME);
TypedSet typedSet = new TypedSet(BIGINT, Optional.empty(), secondBlockBuilder, initialTypedSetEntryCount, FUNCTION_NAME);
BlockBuilder externalBlockBuilder = BIGINT.createFixedSizeBlockBuilder(elementCount);
for (int i = 0; i < elementCount; i++) {
if (i % 10 == 0) {
Expand Down Expand Up @@ -195,7 +196,7 @@ public void testGetElementPositionRandom()
testGetElementPositionRandomFor(set);

BlockBuilder emptyBlockBuilder = VARCHAR.createBlockBuilder(null, 3);
TypedSet setWithPassedInBuilder = new TypedSet(VARCHAR, emptyBlockBuilder, 1, FUNCTION_NAME);
TypedSet setWithPassedInBuilder = new TypedSet(VARCHAR, Optional.empty(), emptyBlockBuilder, 1, FUNCTION_NAME);
testGetElementPositionRandomFor(setWithPassedInBuilder);
}

Expand Down Expand Up @@ -275,7 +276,7 @@ private static void testBigint(Block longBlock, int expectedSetSize)
testBigintFor(typedSet, longBlock);

BlockBuilder emptyBlockBuilder = BIGINT.createBlockBuilder(null, expectedSetSize);
TypedSet typedSetWithPassedInBuilder = new TypedSet(BIGINT, emptyBlockBuilder, expectedSetSize, FUNCTION_NAME);
TypedSet typedSetWithPassedInBuilder = new TypedSet(BIGINT, Optional.empty(), emptyBlockBuilder, expectedSetSize, FUNCTION_NAME);
testBigintFor(typedSetWithPassedInBuilder, longBlock);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,16 @@ public void testDistinct()
assertFunction("ARRAY_DISTINCT(ARRAY [NULL, NULL])", new ArrayType(UNKNOWN), asList((Object) null));
assertFunction("ARRAY_DISTINCT(ARRAY [NULL, NULL, NULL])", new ArrayType(UNKNOWN), asList((Object) null));

// Indeterminate values
assertFunction(
"ARRAY_DISTINCT(ARRAY[(123, 'abc'), (123, NULL)])",
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, createVarcharType(3)))),
ImmutableList.of(asList(123, "abc"), asList(123, null)));
assertFunction(
"ARRAY_DISTINCT(ARRAY[(NULL, NULL), (42, 'def'), (NULL, 'abc'), (123, NULL), (42, 'def'), (NULL, NULL), (NULL, 'abc'), (123, NULL)])",
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, createVarcharType(3)))),
ImmutableList.of(asList(null, null), asList(42, "def"), asList(null, "abc"), asList(123, null)));

// Test for BIGINT-optimized implementation
assertFunction("ARRAY_DISTINCT(ARRAY [CAST(5 AS BIGINT), NULL, CAST(12 AS BIGINT), NULL])", new ArrayType(BIGINT), asList(5L, null, 12L));
assertFunction("ARRAY_DISTINCT(ARRAY [CAST(100 AS BIGINT), NULL, CAST(100 AS BIGINT), NULL, 0, -2, 0])", new ArrayType(BIGINT), asList(100L, null, 0L, -2L));
Expand Down Expand Up @@ -1192,13 +1202,25 @@ public void testArrayIntersect()
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, createVarcharType(3)))),
ImmutableList.of());

// test unsupported
assertNotSupported(
// Indeterminate values
assertFunction(
"ARRAY_INTERSECT(ARRAY[(123, 'abc'), (123, NULL)], ARRAY[(123, 'abc'), (123, NULL)])",
"ROW comparison not supported for fields with null elements");
assertNotSupported(
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, createVarcharType(3)))),
ImmutableList.of(asList(123, "abc"), asList(123, null)));
assertFunction(
"ARRAY_INTERSECT(ARRAY[(NULL, 'abc'), (123, 'abc')], ARRAY[(123, 'abc'),(NULL, 'abc')])",
"ROW comparison not supported for fields with null elements");
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, createVarcharType(3)))),
ImmutableList.of(asList(null, "abc"), asList(123, "abc")));
assertFunction(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add a test with repeated entries in one (or both) of the arrays.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

"ARRAY_INTERSECT(" +
"ARRAY[(NULL, 'abc'), (123, 'abc'), (NULL, 'def'), (NULL, 'abc')], " +
"ARRAY[(123, 'abc'), (NULL, 'abc'), (123, 'def'), (123, 'abc'), (123, 'abc'), (123, 'abc'), (123, 'abc'), (NULL, 'abc'), (NULL, 'abc'), (NULL, 'abc')])",
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, createVarcharType(3)))),
ImmutableList.of(asList(123, "abc"), asList(null, "abc")));
assertFunction(
"ARRAY_INTERSECT(ARRAY[(123, 456), (123, NULL), (42, 43)], ARRAY[(123, NULL), (123, 456), (42, NULL), (NULL, 43)])",
new ArrayType(RowType.anonymous(ImmutableList.of(INTEGER, INTEGER))),
ImmutableList.of(asList(123, null), asList(123, 456)));

assertCachedInstanceHasBoundedRetainedSize("ARRAY_INTERSECT(ARRAY ['foo', 'bar', 'baz'], ARRAY ['foo', 'test', 'bar'])");
}
Expand Down