Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;

// This implementation assumes:
// -There is only one join channel and it is of type bigint
// -arrays used in the hash are always a power of 2.
/**
* This implementation assumes:
* -There is only one join channel and it is of type bigint
* -arrays used in the hash are always a power of 2.
*/
public final class BigintPagesHash
implements PagesHash
{
Expand All @@ -49,7 +51,7 @@ public final class BigintPagesHash
private final PagesHashStrategy pagesHashStrategy;

private final int mask;
private final int[] key;
private final int[] keys;
private final long[] values;
private final long size;

Expand Down Expand Up @@ -77,9 +79,9 @@ public BigintPagesHash(
int hashSize = hashArraySizeSupplier.getHashArraySize(addresses.size());

mask = hashSize - 1;
key = new int[hashSize];
keys = new int[hashSize];
values = new long[addresses.size()];
Arrays.fill(key, -1);
Arrays.fill(keys, -1);

// We will process addresses in batches, to improve spatial and temporal memory locality
int positionsInStep = Math.min(addresses.size() + 1, (int) CACHE_SIZE.toBytes() / Integer.SIZE);
Expand All @@ -91,26 +93,26 @@ public BigintPagesHash(
int stepSize = stepEndPosition - stepBeginPosition;

// index pages
for (int position = 0; position < stepSize; position++) {
int realPosition = position + stepBeginPosition;
if (isPositionNull(realPosition)) {
for (int batchIndex = 0; batchIndex < stepSize; batchIndex++) {
int addressIndex = batchIndex + stepBeginPosition;
if (isPositionNull(addressIndex)) {
continue;
}

long address = addresses.getLong(realPosition);
long address = addresses.getLong(addressIndex);
int blockIndex = decodeSliceIndex(address);
int blockPosition = decodePosition(address);
long value = joinChannelBlocks.get(blockIndex).getLong(blockPosition, 0);

int pos = getHashPosition(value, mask);

// look for an empty slot or a slot containing this key
while (key[pos] != -1) {
int currentKey = key[pos];
while (keys[pos] != -1) {
int currentKey = keys[pos];
if (value == values[currentKey]) {
// found a slot for this key
// link the new key position to the current key position
realPosition = positionLinks.link(realPosition, currentKey);
addressIndex = positionLinks.link(addressIndex, currentKey);

// key[pos] updated outside of this loop
break;
Expand All @@ -120,13 +122,13 @@ public BigintPagesHash(
hashCollisionsLocal++;
}

key[pos] = realPosition;
values[realPosition] = value;
keys[pos] = addressIndex;
values[addressIndex] = value;
}
}

size = sizeOf(addresses.elements()) + pagesHashStrategy.getSizeInBytes() +
sizeOf(key) + sizeOf(values);
sizeOf(keys) + sizeOf(values);
hashCollisions = hashCollisionsLocal;
expectedHashCollisions = estimateNumberOfHashCollisions(addresses.size(), hashSize);
}
Expand Down Expand Up @@ -167,9 +169,9 @@ public int getAddressIndex(int position, Page hashChannelsPage)
long value = hashChannelsPage.getBlock(0).getLong(position, 0);
int pos = getHashPosition(value, mask);

while (key[pos] != -1) {
if (value == values[key[pos]]) {
return key[pos];
while (keys[pos] != -1) {
if (value == values[keys[pos]]) {
return keys[pos];
}
// increment position and mask to handler wrap around
pos = (pos + 1) & mask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class DefaultPagesHash
private final PagesHashStrategy pagesHashStrategy;

private final int mask;
private final int[] key;
private final int[] keys;
private final long size;

// Native array of hashes for faster collisions resolution compared
Expand All @@ -69,12 +69,12 @@ public DefaultPagesHash(
int hashSize = hashArraySizeSupplier.getHashArraySize(addresses.size());

mask = hashSize - 1;
key = new int[hashSize];
Arrays.fill(key, -1);
keys = new int[hashSize];
Arrays.fill(keys, -1);

positionToHashes = new byte[addresses.size()];

// We will process addresses in batches, to save memory on array of hashes.
// We will process addresses in batches, to save memory on array of hashes and improve memory locality.
int positionsInStep = Math.min(addresses.size() + 1, (int) CACHE_SIZE.toBytes() / Integer.SIZE);
long[] positionToFullHashes = new long[positionsInStep];
long hashCollisionsLocal = 0;
Expand All @@ -87,11 +87,11 @@ public DefaultPagesHash(
// First extract all hashes from blocks to native array.
// Somehow having this as a separate loop is much faster compared
// to extracting hashes on the fly in the loop below.
for (int position = 0; position < stepSize; position++) {
int realPosition = position + stepBeginPosition;
long hash = readHashPosition(realPosition);
positionToFullHashes[position] = hash;
positionToHashes[realPosition] = (byte) hash;
for (int batchIndex = 0; batchIndex < stepSize; batchIndex++) {
int addressIndex = batchIndex + stepBeginPosition;
long hash = readHashPosition(addressIndex);
positionToFullHashes[batchIndex] = hash;
positionToHashes[addressIndex] = (byte) hash;
}

// index pages
Expand All @@ -105,8 +105,8 @@ public DefaultPagesHash(
int pos = getHashPosition(hash, mask);

// look for an empty slot or a slot containing this key
while (key[pos] != -1) {
int currentKey = key[pos];
while (keys[pos] != -1) {
int currentKey = keys[pos];
if (((byte) hash) == positionToHashes[currentKey] && positionEqualsPositionIgnoreNulls(currentKey, realPosition)) {
// found a slot for this key
// link the new key position to the current key position
Expand All @@ -120,12 +120,12 @@ public DefaultPagesHash(
hashCollisionsLocal++;
}

key[pos] = realPosition;
keys[pos] = realPosition;
}
}

size = sizeOf(addresses.elements()) + pagesHashStrategy.getSizeInBytes() +
sizeOf(key) + sizeOf(positionToHashes);
sizeOf(keys) + sizeOf(positionToHashes);
hashCollisions = hashCollisionsLocal;
expectedHashCollisions = estimateNumberOfHashCollisions(addresses.size(), hashSize);
}
Expand Down Expand Up @@ -165,9 +165,9 @@ public int getAddressIndex(int rightPosition, Page hashChannelsPage, long rawHas
{
int pos = getHashPosition(rawHash, mask);

while (key[pos] != -1) {
if (positionEqualsCurrentRowIgnoreNulls(key[pos], (byte) rawHash, rightPosition, hashChannelsPage)) {
return key[pos];
while (keys[pos] != -1) {
if (positionEqualsCurrentRowIgnoreNulls(keys[pos], (byte) rawHash, rightPosition, hashChannelsPage)) {
return keys[pos];
}
// increment position and mask to handler wrap around
pos = (pos + 1) & mask;
Expand Down