Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,15 @@ public Void getResult()
}

class AddLowCardinalityDictionaryPageWork
extends LowCardinalityDictionaryWork<Void>
implements Work<Void>
{
private final Page page;
private int[] combinationIdToPosition;
private int nextCombinationIndex;
Comment thread
sopel39 marked this conversation as resolved.
Outdated

public AddLowCardinalityDictionaryPageWork(Page page)
{
super(page);
this.page = requireNonNull(page, "page is null");
}

@Override
Expand All @@ -698,18 +702,20 @@ public boolean process()
return false;
}

int[] combinationIdToPosition = new int[maxCardinality];
Arrays.fill(combinationIdToPosition, -1);
calculateCombinationIdsToPositionMapping(combinationIdToPosition);
if (combinationIdToPosition == null) {
combinationIdToPosition = calculateCombinationIdToPositionMapping(page);
}

// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
for (int i = 0; i < maxCardinality; i++) {
if (needRehash()) {
return false;
}
if (combinationIdToPosition[i] != -1) {
putIfAbsent(combinationIdToPosition[i], page);
for (int combinationIndex = nextCombinationIndex; combinationIndex < combinationIdToPosition.length; combinationIndex++) {
Comment thread
sopel39 marked this conversation as resolved.
Outdated
int position = combinationIdToPosition[combinationIndex];
if (position != -1) {
if (needRehash()) {
nextCombinationIndex = combinationIndex;
return false;
}
putIfAbsent(position, page);
}
}
return true;
Expand Down Expand Up @@ -816,14 +822,18 @@ public GroupByIdBlock getResult()

@VisibleForTesting
class GetLowCardinalityDictionaryGroupIdsWork
extends LowCardinalityDictionaryWork<GroupByIdBlock>
implements Work<GroupByIdBlock>
{
private final Page page;
private final long[] groupIds;
private short[] positionToCombinationIds;
private int[] combinationIdToGroupId;
private int nextPosition;
private boolean finished;

public GetLowCardinalityDictionaryGroupIdsWork(Page page)
{
super(page);
this.page = requireNonNull(page, "page is null");
groupIds = new long[page.getPositionCount()];
}

Expand All @@ -836,27 +846,27 @@ public boolean process()
return false;
}

int positionCount = page.getPositionCount();
int[] combinationIdToPosition = new int[maxCardinality];
Arrays.fill(combinationIdToPosition, -1);
short[] positionToCombinationId = calculateCombinationIdsToPositionMapping(combinationIdToPosition);
int[] combinationIdToGroupId = new int[maxCardinality];
if (positionToCombinationIds == null) {
positionToCombinationIds = new short[groupIds.length];
int maxCardinality = calculatePositionToCombinationIdMapping(page, positionToCombinationIds);
combinationIdToGroupId = new int[maxCardinality];
Arrays.fill(combinationIdToGroupId, -1);
}

// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
for (int i = 0; i < maxCardinality; i++) {
if (needRehash()) {
return false;
}
if (combinationIdToPosition[i] != -1) {
combinationIdToGroupId[i] = putIfAbsent(combinationIdToPosition[i], page);
for (int position = nextPosition; position < groupIds.length; position++) {
short combinationId = positionToCombinationIds[position];
int groupId = combinationIdToGroupId[combinationId];
if (groupId == -1) {
// putIfAbsent will rehash automatically if rehash is needed, unless there isn't enough memory to do so.
// Therefore needRehash will not generally return true even if we have just crossed the capacity boundary.
if (needRehash()) {
nextPosition = position;
return false;
}
groupId = putIfAbsent(position, page);
combinationIdToGroupId[combinationId] = groupId;
}
else {
combinationIdToGroupId[i] = -1;
}
}
for (int i = 0; i < positionCount; i++) {
groupIds[i] = combinationIdToGroupId[positionToCombinationId[i]];
groupIds[position] = groupId;
}
return true;
}
Expand Down Expand Up @@ -980,55 +990,54 @@ public GroupByIdBlock getResult()
}
}

private abstract class LowCardinalityDictionaryWork<T>
implements Work<T>
/**
* Returns an array containing a single Page position occurrence that corresponds to the
* low cardinality dictionary combinationId at that index, or a value of -1 if no position
* within exists within the page for that combinationId.
*/
private int[] calculateCombinationIdToPositionMapping(Page page)
{
protected final Page page;
protected final int maxCardinality;
protected final int[] dictionarySizes;
protected final DictionaryBlock[] blocks;
short[] positionToCombinationIds = new short[page.getPositionCount()];
int maxCardinality = calculatePositionToCombinationIdMapping(page, positionToCombinationIds);

public LowCardinalityDictionaryWork(Page page)
{
this.page = requireNonNull(page, "page is null");
dictionarySizes = new int[channels.length];
blocks = new DictionaryBlock[channels.length];
int maxCardinality = 1;
for (int i = 0; i < channels.length; i++) {
Block block = page.getBlock(channels[i]);
verify(block instanceof DictionaryBlock, "Only dictionary blocks are supported");
blocks[i] = (DictionaryBlock) block;
int blockPositionCount = blocks[i].getDictionary().getPositionCount();
dictionarySizes[i] = blockPositionCount;
maxCardinality *= blockPositionCount;
}
this.maxCardinality = maxCardinality;
int[] combinationIdToPosition = new int[maxCardinality];
Arrays.fill(combinationIdToPosition, -1);
for (int position = 0; position < positionToCombinationIds.length; position++) {
combinationIdToPosition[positionToCombinationIds[position]] = position;
}
return combinationIdToPosition;
}

/**
* Returns combinations of all dictionaries ids for every position and populates
* samplePositions array with a single occurrence of every used combination
*/
protected short[] calculateCombinationIdsToPositionMapping(int[] combinationIdToPosition)
{
int positionCount = page.getPositionCount();
// short arrays improve performance compared to int
short[] combinationIds = new short[positionCount];

for (int i = 0; i < positionCount; i++) {
combinationIds[i] = (short) blocks[0].getId(i);
}
for (int j = 1; j < channels.length; j++) {
for (int i = 0; i < positionCount; i++) {
combinationIds[i] *= dictionarySizes[j];
combinationIds[i] += blocks[j].getId(i);
/**
* Returns the number of combinations of all dictionaries ids for every position and populates
* positionToCombinationIds with the combinationId for each position in the input Page
* @return the maximum cardinality of all combinations of all dictionaries ids that could be present
*/
private int calculatePositionToCombinationIdMapping(Page page, short[] positionToCombinationIds)
{
checkArgument(positionToCombinationIds.length == page.getPositionCount());

int maxCardinality = 1;
for (int channel = 0; channel < channels.length; channel++) {
Block block = page.getBlock(channels[channel]);
verify(block instanceof DictionaryBlock, "Only dictionary blocks are supported");
DictionaryBlock dictionaryBlock = (DictionaryBlock) block;
int dictionarySize = dictionaryBlock.getDictionary().getPositionCount();
maxCardinality *= dictionarySize;
if (channel == 0) {
for (int position = 0; position < positionToCombinationIds.length; position++) {
positionToCombinationIds[position] = (short) dictionaryBlock.getId(position);
}
}

for (int i = 0; i < positionCount; i++) {
combinationIdToPosition[combinationIds[i]] = i;
else {
for (int position = 0; position < positionToCombinationIds.length; position++) {
short combinationId = positionToCombinationIds[position];
combinationId *= dictionarySize;
combinationId += dictionaryBlock.getId(position);
positionToCombinationIds[position] = combinationId;
}
}
return combinationIds;
}
return maxCardinality;
}
}