Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,32 +162,6 @@ public Work<int[]> getGroupIds(Page page)
return new GetGroupIdsWork(block);
}

@Override
public boolean contains(int position, Page page)
{
Block block = page.getBlock(0);
if (block.isNull(position)) {
return nullGroupId >= 0;
}

long value = BIGINT.getLong(block, position);
int hashPosition = getHashPosition(value, mask);

// look for an empty slot or a slot containing this key
while (true) {
int groupId = groupIds[hashPosition];
if (groupId == -1) {
return false;
}
if (value == values[hashPosition]) {
return true;
}

// increment position and mask to handle wrap around
hashPosition = (hashPosition + 1) & mask;
}
}

@Override
public long getRawHash(int groupId)
{
Expand Down
110 changes: 55 additions & 55 deletions core/trino-main/src/main/java/io/trino/operator/ChannelSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,37 @@
*/
package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
import io.trino.sql.gen.JoinCompiler;

import static io.trino.operator.GroupByHash.createGroupByHash;
import static io.trino.type.UnknownType.UNKNOWN;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.BLOCK_POSITION_NOT_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationArgumentConvention.FLAT;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FAIL_ON_NULL;
import static io.trino.spi.function.InvocationConvention.InvocationReturnConvention.FLAT_RETURN;
import static io.trino.spi.function.InvocationConvention.simpleConvention;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.util.Objects.requireNonNull;

public class ChannelSet
{
private final GroupByHash hash;
private final boolean containsNull;
private final FlatSet set;

private ChannelSet(GroupByHash hash, boolean containsNull)
private ChannelSet(FlatSet set)
{
this.hash = hash;
this.containsNull = containsNull;
this.set = set;
}

public long getEstimatedSizeInBytes()
{
return hash.getEstimatedSize();
return set.getEstimatedSize();
}

public int size()
{
return hash.getGroupCount();
return set.size();
}

public boolean isEmpty()
Expand All @@ -53,67 +53,67 @@ public boolean isEmpty()

public boolean containsNull()
{
return containsNull;
return set.containsNull();
}

public boolean contains(int position, Page page)
public boolean contains(Block valueBlock, int position)
{
return hash.contains(position, page);
return set.contains(valueBlock, position);
}

public boolean contains(int position, Page page, long rawHash)
public boolean contains(Block valueBlock, int position, long rawHash)
{
return hash.contains(position, page, rawHash);
return set.contains(valueBlock, position, rawHash);
}

public static class ChannelSetBuilder
{
private final Type type;
private final OperatorContext operatorContext;
private final LocalMemoryContext localMemoryContext;
private final GroupByHash hash;
private final LocalMemoryContext memoryContext;
private final FlatSet set;

public ChannelSetBuilder(Type type, boolean hasPrecomputedHash, int expectedPositions, OperatorContext operatorContext, JoinCompiler joinCompiler, TypeOperators typeOperators)
public ChannelSetBuilder(Type type, TypeOperators typeOperators, LocalMemoryContext memoryContext)
{
this.type = requireNonNull(type, "type is null");
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.localMemoryContext = operatorContext.localUserMemoryContext();
this.hash = createGroupByHash(
operatorContext.getSession(),
ImmutableList.of(type),
hasPrecomputedHash,
expectedPositions,
joinCompiler,
typeOperators,
this::updateMemoryReservation);
set = new FlatSet(
type,
typeOperators.getReadValueOperator(type, simpleConvention(FLAT_RETURN, BLOCK_POSITION_NOT_NULL)),
typeOperators.getHashCodeOperator(type, simpleConvention(FAIL_ON_NULL, FLAT)),
typeOperators.getDistinctFromOperator(type, simpleConvention(FAIL_ON_NULL, FLAT, BLOCK_POSITION_NOT_NULL)),
typeOperators.getHashCodeOperator(type, simpleConvention(FAIL_ON_NULL, BLOCK_POSITION_NOT_NULL)));
this.memoryContext = requireNonNull(memoryContext, "memoryContext is null");
this.memoryContext.setBytes(set.getEstimatedSize());
}

public ChannelSet build()
{
Page nullBlockPage = new Page(type.createBlockBuilder(null, 1, UNKNOWN.getFixedSize()).appendNull().build());
boolean containsNull = hash.contains(0, nullBlockPage);
return new ChannelSet(hash, containsNull);
return new ChannelSet(set);
}

public Work<?> addPage(Page page)
public void addAll(Block valueBlock, Block hashBlock)
{
// Just add the page to the pending work, which will be processed later.
return hash.addPage(page);
}

public boolean updateMemoryReservation()
{
// If memory is not available, once we return, this operator will be blocked until memory is available.
localMemoryContext.setBytes(hash.getEstimatedSize());

// If memory is not available, inform the caller that we cannot proceed for allocation.
return operatorContext.isWaitingForMemory().isDone();
}

@VisibleForTesting
public int getCapacity()
{
return hash.getCapacity();
if (valueBlock.getPositionCount() == 0) {
return;
}

if (valueBlock instanceof RunLengthEncodedBlock rleBlock) {
if (hashBlock != null) {
set.add(rleBlock.getValue(), 0, BIGINT.getLong(hashBlock, 0));
}
else {
set.add(rleBlock.getValue(), 0);
}
}
else if (hashBlock != null) {
for (int position = 0; position < valueBlock.getPositionCount(); position++) {
set.add(valueBlock, position, BIGINT.getLong(hashBlock, position));
}
}
else {
for (int position = 0; position < valueBlock.getPositionCount(); position++) {
set.add(valueBlock, position);
}
}

memoryContext.setBytes(set.getEstimatedSize());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,32 +161,6 @@ public Work<int[]> getGroupIds(Page page)
return new GetNonDictionaryGroupIdsWork(blocks);
}

@Override
public boolean contains(int position, Page page)
{
return flatHash.contains(getBlocksForContainsPage(page), position);
}

@Override
public boolean contains(int position, Page page, long hash)
{
return flatHash.contains(getBlocksForContainsPage(page), position, hash);
}

private Block[] getBlocksForContainsPage(Page page)
{
// contains page only has the group by channels as the optional hash is passed directly
checkArgument(page.getChannelCount() == groupByChannelCount);
Block[] blocks = currentBlocks;
for (int i = 0; i < page.getChannelCount(); i++) {
blocks[i] = page.getBlock(i);
}
if (hasPrecomputedHash) {
blocks[blocks.length - 1] = null;
}
return blocks;
}

@VisibleForTesting
@Override
public int getCapacity()
Expand Down Expand Up @@ -442,7 +416,7 @@ public AddRunLengthEncodedPageWork(Block[] blocks)
{
for (int i = 0; i < blocks.length; i++) {
// GroupBy blocks are guaranteed to be RLE, but hash block might not be an RLE due to bugs
// use getSingleValueBlock here which for RLE is a no-op, but will still work if hash block is not RLE
// use getSingleValueBlock here, which for RLE is a no-op, but will still work if hash block is not RLE
blocks[i] = blocks[i].getSingleValueBlock(0);
}
this.blocks = blocks;
Expand Down Expand Up @@ -639,7 +613,7 @@ public GetRunLengthEncodedGroupIdsWork(Block[] blocks)
positionCount = blocks[0].getPositionCount();
for (int i = 0; i < blocks.length; i++) {
// GroupBy blocks are guaranteed to be RLE, but hash block might not be an RLE due to bugs
// use getSingleValueBlock here which for RLE is a no-op, but will still work if hash block is not RLE
// use getSingleValueBlock here, which for RLE is a no-op, but will still work if hash block is not RLE
blocks[i] = blocks[i].getSingleValueBlock(0);
}
this.blocks = blocks;
Expand Down
Loading