Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.array.LongBigArray;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.block.DictionaryBlock;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.PrestoException;
Expand All @@ -28,6 +29,7 @@
import com.google.common.collect.ImmutableList;
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.List;

import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class BigintGroupByHash
private final LongBigArray valuesByGroupId;

private int nextGroupId;
private DictionaryLookBack dictionaryLookBack;
private long hashCollisions;
private double expectedHashCollisions;

Expand Down Expand Up @@ -162,6 +165,11 @@ public void appendValuesTo(int groupId, PageBuilder pageBuilder, int outputChann
public Work<?> addPage(Page page)
{
currentPageSizeInBytes = page.getRetainedSizeInBytes();
Block block = page.getBlock(hashChannel);
if (block instanceof DictionaryBlock) {
return new AddDictionaryPageWork((DictionaryBlock) block);
}

return new AddPageWork(page.getBlock(hashChannel));
}

Expand All @@ -177,6 +185,11 @@ public List<Page> getBufferedPages()
public Work<GroupByIdBlock> getGroupIds(Page page)
{
currentPageSizeInBytes = page.getRetainedSizeInBytes();
Block block = page.getBlock(hashChannel);
if (block instanceof DictionaryBlock) {
return new GetDictionaryGroupIdsWork((DictionaryBlock) block);
}

return new GetGroupIdsWork(page.getBlock(hashChannel));
}

Expand Down Expand Up @@ -278,7 +291,7 @@ private boolean tryRehash()

// An estimate of how much extra memory is needed before we can go ahead and expand the hash table.
// This includes the new capacity for values, groupIds, and valuesByGroupId as well as the size of the current page
preallocatedMemoryInBytes = newCapacity * (long) (Long.BYTES + Integer.BYTES) + calculateMaxFill(newCapacity) * Long.BYTES + currentPageSizeInBytes;
preallocatedMemoryInBytes = newCapacity * (long) (Long.BYTES + Integer.BYTES) + (long) calculateMaxFill(newCapacity) * Long.BYTES + currentPageSizeInBytes;
if (!updateMemory.update()) {
// reserved memory but has exceeded the limit
return false;
Expand Down Expand Up @@ -345,7 +358,26 @@ private static int calculateMaxFill(int hashSize)
return maxFill;
}

private class AddPageWork
private void updateDictionaryLookBack(Block dictionary)
{
if (dictionaryLookBack == null || dictionaryLookBack.getDictionary() != dictionary) {
dictionaryLookBack = new DictionaryLookBack(dictionary);
}
}

private int getGroupId(Block dictionary, int positionInDictionary)
{
if (dictionaryLookBack.isProcessed(positionInDictionary)) {
return dictionaryLookBack.getGroupId(positionInDictionary);
}

int groupId = putIfAbsent(positionInDictionary, dictionary);
dictionaryLookBack.setProcessed(positionInDictionary, groupId);
return groupId;
}

@VisibleForTesting
class AddPageWork
implements Work<Void>
{
private final Block block;
Expand Down Expand Up @@ -386,6 +418,51 @@ public Void getResult()
}
}

private class AddDictionaryPageWork
implements Work<Void>
{
private final Block dictionary;
private final DictionaryBlock block;

private int lastPosition;

public AddDictionaryPageWork(DictionaryBlock block)
{
this.block = requireNonNull(block, "block is null");
this.dictionary = block.getDictionary();
updateDictionaryLookBack(dictionary);
}

@Override
public boolean process()
{
int positionCount = block.getPositionCount();
checkState(lastPosition <= positionCount, "position count out of bound");

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
if (needRehash() && !tryRehash()) {
return false;
}

// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
while (lastPosition < positionCount && !needRehash()) {
int positionInDictionary = block.getId(lastPosition);
getGroupId(dictionary, positionInDictionary);
lastPosition++;
}
return lastPosition == positionCount;
}

@Override
public Void getResult()
{
throw new UnsupportedOperationException();
}
}

@VisibleForTesting
private class GetGroupIdsWork
implements Work<GroupByIdBlock>
{
Expand Down Expand Up @@ -434,4 +511,91 @@ public GroupByIdBlock getResult()
return new GroupByIdBlock(nextGroupId, blockBuilder.build());
}
}

private class GetDictionaryGroupIdsWork
implements Work<GroupByIdBlock>
{
private final BlockBuilder blockBuilder;
private final Block dictionary;
private final DictionaryBlock block;

private boolean finished;
private int lastPosition;

public GetDictionaryGroupIdsWork(DictionaryBlock block)
{
this.block = requireNonNull(block, "block is null");
this.dictionary = block.getDictionary();
updateDictionaryLookBack(dictionary);

// we know the exact size required for the block
this.blockBuilder = BIGINT.createFixedSizeBlockBuilder(block.getPositionCount());
}

@Override
public boolean process()
{
int positionCount = block.getPositionCount();
checkState(lastPosition <= positionCount, "position count out of bound");
checkState(!finished);

// needRehash() == false indicates we have reached capacity boundary and a rehash is needed.
// We can only proceed if tryRehash() successfully did a rehash.
if (needRehash() && !tryRehash()) {
return false;
}

// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
while (lastPosition < positionCount && !needRehash()) {
int positionInDictionary = block.getId(lastPosition);
int groupId = getGroupId(dictionary, positionInDictionary);
BIGINT.writeLong(blockBuilder, groupId);
lastPosition++;
}
return lastPosition == positionCount;
}

@Override
public GroupByIdBlock getResult()
{
checkState(lastPosition == block.getPositionCount(), "process has not yet finished");
checkState(!finished, "result has produced");
finished = true;
return new GroupByIdBlock(nextGroupId, blockBuilder.build());
}
}

private static final class DictionaryLookBack
{
private final Block dictionary;
private final int[] processed;

public DictionaryLookBack(Block dictionary)
{
this.dictionary = dictionary;
this.processed = new int[dictionary.getPositionCount()];
Arrays.fill(processed, -1);
}

public Block getDictionary()
{
return dictionary;
}

public int getGroupId(int position)
{
return processed[position];
}

public boolean isProcessed(int position)
{
return processed[position] != -1;
}

public void setProcessed(int position, int groupId)
{
processed[position] = groupId;
}
}
}
Loading