Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.AbstractLongType;
import io.trino.spi.type.BigintType;

import java.util.Arrays;
Expand All @@ -33,7 +32,6 @@
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.type.TypeUtils.NULL_HASH_CODE;
import static it.unimi.dsi.fastutil.HashCommon.arraySize;
import static it.unimi.dsi.fastutil.HashCommon.murmurHash3;
import static java.lang.Math.min;
Expand All @@ -48,8 +46,6 @@ public class BigintGroupByHash

private static final float FILL_RATIO = 0.75f;

private final boolean outputRawHash;

private int hashCapacity;
private int maxFill;
private int mask;
Expand All @@ -72,12 +68,10 @@ public class BigintGroupByHash
private long preallocatedMemoryInBytes;
private long currentPageSizeInBytes;

public BigintGroupByHash(boolean outputRawHash, int expectedSize, UpdateMemory updateMemory)
public BigintGroupByHash(int expectedSize, UpdateMemory updateMemory)
{
checkArgument(expectedSize > 0, "expectedSize must be greater than zero");

this.outputRawHash = outputRawHash;

hashCapacity = arraySize(expectedSize, FILL_RATIO);

maxFill = calculateMaxFill(hashCapacity);
Expand All @@ -95,7 +89,6 @@ public BigintGroupByHash(boolean outputRawHash, int expectedSize, UpdateMemory u

private BigintGroupByHash(BigintGroupByHash other)
{
outputRawHash = other.outputRawHash;
hashCapacity = other.hashCapacity;
maxFill = other.maxFill;
mask = other.mask;
Expand Down Expand Up @@ -137,16 +130,6 @@ public void appendValuesTo(int groupId, PageBuilder pageBuilder)
else {
BIGINT.writeLong(blockBuilder, valuesByGroupId[groupId]);
}

if (outputRawHash) {
BlockBuilder hashBlockBuilder = pageBuilder.getBlockBuilder(1);
if (groupId == nullGroupId) {
BIGINT.writeLong(hashBlockBuilder, NULL_HASH_CODE);
}
else {
BIGINT.writeLong(hashBlockBuilder, AbstractLongType.hash(valuesByGroupId[groupId]));
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -45,7 +44,6 @@ public static class DistinctLimitOperatorFactory
private final List<Integer> distinctChannels;
private final List<Type> sourceTypes;
private final long limit;
private final Optional<Integer> hashChannel;
private boolean closed;
private final FlatHashStrategyCompiler hashStrategyCompiler;

Expand All @@ -55,7 +53,6 @@ public DistinctLimitOperatorFactory(
List<? extends Type> sourceTypes,
List<Integer> distinctChannels,
long limit,
Optional<Integer> hashChannel,
FlatHashStrategyCompiler hashStrategyCompiler)
{
this.operatorId = operatorId;
Expand All @@ -65,7 +62,6 @@ public DistinctLimitOperatorFactory(

checkArgument(limit >= 0, "limit must be at least zero");
this.limit = limit;
this.hashChannel = requireNonNull(hashChannel, "hashChannel is null");
this.hashStrategyCompiler = requireNonNull(hashStrategyCompiler, "hashStrategyCompiler is null");
}

Expand All @@ -77,7 +73,7 @@ public Operator createOperator(DriverContext driverContext)
List<Type> distinctTypes = distinctChannels.stream()
.map(sourceTypes::get)
.collect(toImmutableList());
return new DistinctLimitOperator(operatorContext, distinctChannels, distinctTypes, limit, hashChannel, hashStrategyCompiler);
return new DistinctLimitOperator(operatorContext, distinctChannels, distinctTypes, limit, hashStrategyCompiler);
}

@Override
Expand All @@ -89,7 +85,7 @@ public void noMoreOperators()
@Override
public OperatorFactory duplicate()
{
return new DistinctLimitOperatorFactory(operatorId, planNodeId, sourceTypes, distinctChannels, limit, hashChannel, hashStrategyCompiler);
return new DistinctLimitOperatorFactory(operatorId, planNodeId, sourceTypes, distinctChannels, limit, hashStrategyCompiler);
}
}

Expand All @@ -114,29 +110,17 @@ public DistinctLimitOperator(
List<Integer> distinctChannels,
List<Type> distinctTypes,
long limit,
Optional<Integer> hashChannel,
FlatHashStrategyCompiler hashStrategyCompiler)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.localUserMemoryContext = operatorContext.localUserMemoryContext();
checkArgument(limit >= 0, "limit must be at least zero");
checkArgument(distinctTypes.size() == distinctChannels.size(), "distinctTypes and distinctChannels sizes don't match");

if (hashChannel.isPresent()) {
this.inputChannels = new int[distinctChannels.size() + 1];
for (int i = 0; i < distinctChannels.size(); i++) {
this.inputChannels[i] = distinctChannels.get(i);
}
this.inputChannels[distinctChannels.size()] = hashChannel.get();
}
else {
this.inputChannels = Ints.toArray(distinctChannels);
}
this.inputChannels = Ints.toArray(distinctChannels);

this.groupByHash = createGroupByHash(
operatorContext.getSession(),
distinctTypes,
hashChannel.isPresent(),
false,
toIntExact(min(limit, 10_000)),
hashStrategyCompiler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public FlatGroupByHash(

checkArgument(expectedSize > 0, "expectedSize must be greater than zero");

int totalChannels = hashTypes.size() + (hashMode.isHashPrecomputed() ? 1 : 0);
int totalChannels = hashTypes.size();
this.currentBlocks = new Block[totalChannels];
this.currentBlockBuilders = new BlockBuilder[totalChannels];

Expand Down Expand Up @@ -230,17 +230,7 @@ private void updateDictionaryLookBack(Block dictionary)

private boolean canProcessDictionary(Block[] blocks)
{
if (!processDictionary || !(blocks[0] instanceof DictionaryBlock inputDictionary)) {
return false;
}

if (!hashMode.isHashPrecomputed()) {
return true;
}

// dictionarySourceIds of data block and hash block must match
return blocks[1] instanceof DictionaryBlock hashDictionary &&
hashDictionary.getDictionarySourceId().equals(inputDictionary.getDictionarySourceId());
return processDictionary && blocks[0] instanceof DictionaryBlock;
}

private boolean canProcessLowCardinalityDictionary(Block[] blocks)
Expand Down
27 changes: 2 additions & 25 deletions core/trino-main/src/main/java/io/trino/operator/FlatHash.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.AppendOnlyVariableWidthData.getChunkOffset;
import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.trino.spi.type.BigintType.BIGINT;
import static java.lang.Math.addExact;
import static java.lang.Math.max;
import static java.lang.Math.multiplyExact;
Expand Down Expand Up @@ -63,7 +62,6 @@ private static int calculateMaxFill(int capacity)
private final AppendOnlyVariableWidthData variableWidthData;
private final UpdateMemory checkMemoryReservation;

private final boolean hasPrecomputedHash;
private final boolean cacheHashValue;
private final int fixedRecordSize;
private final int variableWidthOffset;
Expand All @@ -87,7 +85,6 @@ public FlatHash(FlatHashStrategy flatHashStrategy, GroupByHashMode hashMode, int
boolean hasVariableData = flatHashStrategy.isAnyVariableWidth();
this.variableWidthData = hasVariableData ? new AppendOnlyVariableWidthData() : null;
requireNonNull(hashMode, "hashMode is null");
this.hasPrecomputedHash = hashMode.isHashPrecomputed();
this.cacheHashValue = hashMode.isHashCached();

// the record is laid out as follows:
Expand All @@ -114,7 +111,6 @@ public FlatHash(FlatHash other)
this.flatHashStrategy = other.flatHashStrategy;
this.checkMemoryReservation = other.checkMemoryReservation;
this.variableWidthData = other.variableWidthData == null ? null : new AppendOnlyVariableWidthData(other.variableWidthData);
this.hasPrecomputedHash = other.hasPrecomputedHash;
this.cacheHashValue = other.cacheHashValue;
this.fixedRecordSize = other.fixedRecordSize;
this.variableWidthOffset = other.variableWidthOffset;
Expand Down Expand Up @@ -198,35 +194,16 @@ public void appendTo(int groupId, BlockBuilder[] blockBuilders)
variableWidthChunk,
variableChunkOffset,
blockBuilders);

if (hasPrecomputedHash) {
BIGINT.writeLong(blockBuilders[blockBuilders.length - 1], (long) LONG_HANDLE.get(fixedSizeRecords, recordOffset));
}
}

public void computeHashes(Block[] blocks, long[] hashes, int offset, int length)
{
if (hasPrecomputedHash) {
Block hashBlock = blocks[blocks.length - 1];
for (int i = 0; i < length; i++) {
hashes[i] = BIGINT.getLong(hashBlock, offset + i);
}
}
else {
flatHashStrategy.hashBlocksBatched(blocks, hashes, offset, length);
}
flatHashStrategy.hashBlocksBatched(blocks, hashes, offset, length);
}

public int putIfAbsent(Block[] blocks, int position)
{
long hash;
if (hasPrecomputedHash) {
hash = BIGINT.getLong(blocks[blocks.length - 1], position);
}
else {
hash = flatHashStrategy.hash(blocks, position);
}

long hash = flatHashStrategy.hash(blocks, position);
return putIfAbsent(blocks, position, hash);
}

Expand Down
10 changes: 3 additions & 7 deletions core/trino-main/src/main/java/io/trino/operator/GroupByHash.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public interface GroupByHash
static GroupByHash createGroupByHash(
Session session,
List<Type> types,
boolean hasPrecomputedHash,
boolean spillable,
int expectedSize,
FlatHashStrategyCompiler hashStrategyCompiler,
Expand All @@ -43,18 +42,15 @@ static GroupByHash createGroupByHash(
boolean dictionaryAggregationEnabled = isDictionaryAggregationEnabled(session);
return createGroupByHash(
types,
selectGroupByHashMode(hasPrecomputedHash, spillable, types),
selectGroupByHashMode(spillable, types),
expectedSize,
dictionaryAggregationEnabled,
hashStrategyCompiler,
updateMemory);
}

static GroupByHashMode selectGroupByHashMode(boolean hasPrecomputedHash, boolean spillable, List<Type> types)
static GroupByHashMode selectGroupByHashMode(boolean spillable, List<Type> types)
{
if (hasPrecomputedHash) {
return GroupByHashMode.PRECOMPUTED;
}
// Spillable aggregations should always cache hash values since spilling requires sorting by the hash value
if (spillable) {
return GroupByHashMode.CACHED;
Expand Down Expand Up @@ -92,7 +88,7 @@ static GroupByHash createGroupByHash(
UpdateMemory updateMemory)
{
if (types.size() == 1 && types.get(0).equals(BIGINT)) {
return new BigintGroupByHash(hashMode.isHashPrecomputed(), expectedSize, updateMemory);
return new BigintGroupByHash(expectedSize, updateMemory);
}
return new FlatGroupByHash(
types,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package io.trino.operator;

public enum GroupByHashMode {
// Hash values are pre-computed as input, and emitted as output
PRECOMPUTED,
// Hash values are computed by the FlatGroupByHash instance and stored along with the entry. This consumes more
// memory, but makes re-hashing cheaper by avoiding the need to re-compute each hash code and also makes the
// valueIdentical check cheaper by avoiding a deep equality check when hashes don't match
Expand All @@ -24,15 +22,10 @@ public enum GroupByHashMode {
// table which saves memory, but can be more expensive during rehash.
ON_DEMAND;

public boolean isHashPrecomputed()
{
return this == PRECOMPUTED;
}

public boolean isHashCached()
{
return switch (this) {
case PRECOMPUTED, CACHED -> true;
case CACHED -> true;
case ON_DEMAND -> false;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class GroupByHashPageIndexer

public GroupByHashPageIndexer(List<Type> hashTypes, FlatHashStrategyCompiler hashStrategyCompiler)
{
this(GroupByHash.createGroupByHash(hashTypes, selectGroupByHashMode(false, false, hashTypes), 20, false, hashStrategyCompiler, NOOP));
this(GroupByHash.createGroupByHash(hashTypes, selectGroupByHashMode(false, hashTypes), 20, false, hashStrategyCompiler, NOOP));
}

public GroupByHashPageIndexer(GroupByHash hash)
Expand Down
Loading
Loading