Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submission #3: jincongho #482

Merged
merged 1 commit into from
Jan 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Changelog (based on Macbook Pro Intel i7 6-cores 2.6GHz):
Expand Down Expand Up @@ -123,13 +122,31 @@ public static int findDelimiter(MemorySegment data, long offset) {
// }

// scalar implementation
// public static int hashCode(final MemorySegment array, final long offset, final short length) {
// final long limit = offset + length;
// int h = 1;
// for (long i = offset; i < limit; i++) {
// h = 31 * h + UNSAFE.getByte(array.address() + i);
// }
// return h;
// }

// fxhash
public static int hashCode(final MemorySegment array, final long offset, final short length) {
final long limit = offset + length;
int h = 1;
for (long i = offset; i < limit; i++) {
h = 31 * h + UNSAFE.getByte(array.address() + i);
final int seed = 0x9E3779B9;
final int rotate = 5;

int x, y;
if (length >= Integer.BYTES) {
x = UNSAFE.getInt(array.address() + offset);
y = UNSAFE.getInt(array.address() + offset + length - Integer.BYTES);
}
return h;
else {
x = UNSAFE.getByte(array.address() + offset);
y = UNSAFE.getByte(array.address() + offset + length - Byte.BYTES);
}

return (Integer.rotateLeft(x * seed, rotate) ^ y) * seed;
}

/** Vectorized Key Comparison **/
Expand Down Expand Up @@ -209,7 +226,7 @@ public void update(MemorySegment key, long keyStart, short keyLength, int keyHas
}
else {
index = (index + 1) & KEY_MASK;
keyOffset += KEY_SIZE;
keyOffset = KEYS.address() + (index * KEY_SIZE);
}
}

Expand Down Expand Up @@ -254,7 +271,7 @@ public void mergeTo(ResultAggr result) {
* Measurement Aggregation (for all partitions)
* Simple Concurrent Hash Table so all partitions can merge concurrently
*/
protected static class ResultAggr extends ConcurrentHashMap<ResultAggr.ByteKey, ResultAggr.Measurement> {
protected static class ResultAggr extends HashMap<ResultAggr.ByteKey, ResultAggr.Measurement> {

public static class ByteKey implements Comparable<ByteKey> {
private final MemorySegment data;
Expand All @@ -270,10 +287,8 @@ public ByteKey(MemorySegment data, long offset, short length) {

@Override
public boolean equals(Object other) {
if (length != ((ByteKey) other).length)
return false;

return !VectorUtils.notEquals(data, offset, ((ByteKey) other).data, ((ByteKey) other).offset, length, VectorUtils.BYTE_SPECIES);
return (length == ((ByteKey) other).length)
&& !VectorUtils.notEquals(data, offset, ((ByteKey) other).data, ((ByteKey) other).offset, length, VectorUtils.BYTE_SPECIES);
}

@Override
Expand Down Expand Up @@ -311,8 +326,8 @@ public String toString() {

}

public ResultAggr(int initialCapacity, float loadFactor, int concurrencyLevel) {
super(initialCapacity, loadFactor, concurrencyLevel);
public ResultAggr(int initialCapacity, float loadFactor) {
super(initialCapacity, loadFactor);
}

public Map toSorted() {
Expand All @@ -326,9 +341,9 @@ protected static class Partition implements Runnable {
private final MemorySegment data;
private long offset;
private final long limit;
private final ResultAggr result;
private final PartitionAggr result;

public Partition(MemorySegment data, long offset, long limit, ResultAggr result) {
public Partition(MemorySegment data, long offset, long limit, PartitionAggr result) {
this.data = data;
this.offset = offset;
this.limit = limit;
Expand All @@ -338,7 +353,7 @@ public Partition(MemorySegment data, long offset, long limit, ResultAggr result)
@Override
public void run() {
// measurement parsing
PartitionAggr aggr = new PartitionAggr();
final PartitionAggr aggr = this.result;

// main loop (vectorized)
final long loopLimit = limit - (VectorUtils.BYTE_SPECIES.length() * Math.ceilDiv(100, VectorUtils.BYTE_SPECIES.length()) + Long.BYTES);
Expand Down Expand Up @@ -402,7 +417,7 @@ public void run() {
}

// measurement result collection
aggr.mergeTo(result);
// aggr.mergeTo(result);
}

}
Expand Down Expand Up @@ -435,15 +450,25 @@ public static void main(String[] args) throws IOException, InterruptedException

// partition aggregation
var threadList = new Thread[processors];
ResultAggr result = new ResultAggr(1 << 14, 1, processors);
PartitionAggr[] partAggrs = new PartitionAggr[processors];
for (int i = 0; i < processors; i++) {
threadList[i] = new Thread(new Partition(data, partition[i], partition[i + 1], result));
if (partition[i] == data.byteSize())
break;

partAggrs[i] = new PartitionAggr();
threadList[i] = new Thread(new Partition(data, partition[i], partition[i + 1], partAggrs[i]));
threadList[i].start();
}
for (var thread : threadList) {
thread.join();
}

// result
ResultAggr result = new ResultAggr(1 << 14, 1);
for (int i = 0; i < processors; i++) {
if (partition[i] == data.byteSize())
break;

threadList[i].join();
partAggrs[i].mergeTo(result);
}
System.out.println(result.toSorted());
}

Expand Down
Loading