From 336052c9e23750004958528df0806b5cbb85b9d3 Mon Sep 17 00:00:00 2001 From: Jin Cong Ho Date: Thu, 18 Jan 2024 22:46:08 +0000 Subject: [PATCH] Submission #3: jincongho --- .../onebrc/CalculateAverage_jincongho.java | 71 +++++++++++++------ 1 file changed, 48 insertions(+), 23 deletions(-) diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java b/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java index d2a7e6609..0758703bc 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_jincongho.java @@ -31,7 +31,6 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; /** * Changelog (based on Macbook Pro Intel i7 6-cores 2.6GHz): @@ -123,13 +122,31 @@ public static int findDelimiter(MemorySegment data, long offset) { // } // scalar implementation + // public static int hashCode(final MemorySegment array, final long offset, final short length) { + // final long limit = offset + length; + // int h = 1; + // for (long i = offset; i < limit; i++) { + // h = 31 * h + UNSAFE.getByte(array.address() + i); + // } + // return h; + // } + + // fxhash public static int hashCode(final MemorySegment array, final long offset, final short length) { - final long limit = offset + length; - int h = 1; - for (long i = offset; i < limit; i++) { - h = 31 * h + UNSAFE.getByte(array.address() + i); + final int seed = 0x9E3779B9; + final int rotate = 5; + + int x, y; + if (length >= Integer.BYTES) { + x = UNSAFE.getInt(array.address() + offset); + y = UNSAFE.getInt(array.address() + offset + length - Integer.BYTES); } - return h; + else { + x = UNSAFE.getByte(array.address() + offset); + y = UNSAFE.getByte(array.address() + offset + length - Byte.BYTES); + } + + return (Integer.rotateLeft(x * seed, rotate) ^ y) * seed; } /** Vectorized Key Comparison **/ @@ -209,7 +226,7 @@ public void update(MemorySegment key, long keyStart, short keyLength, int keyHas } else { index = (index + 1) & KEY_MASK; - keyOffset += KEY_SIZE; + keyOffset = KEYS.address() + (index * KEY_SIZE); } } @@ -254,7 +271,7 @@ public void mergeTo(ResultAggr result) { * Measurement Aggregation (for all partitions) * Simple Concurrent Hash Table so all partitions can merge concurrently */ - protected static class ResultAggr extends ConcurrentHashMap { + protected static class ResultAggr extends HashMap { public static class ByteKey implements Comparable { private final MemorySegment data; @@ -270,10 +287,8 @@ public ByteKey(MemorySegment data, long offset, short length) { @Override public boolean equals(Object other) { - if (length != ((ByteKey) other).length) - return false; - - return !VectorUtils.notEquals(data, offset, ((ByteKey) other).data, ((ByteKey) other).offset, length, VectorUtils.BYTE_SPECIES); + return (length == ((ByteKey) other).length) + && !VectorUtils.notEquals(data, offset, ((ByteKey) other).data, ((ByteKey) other).offset, length, VectorUtils.BYTE_SPECIES); } @Override @@ -311,8 +326,8 @@ public String toString() { } - public ResultAggr(int initialCapacity, float loadFactor, int concurrencyLevel) { - super(initialCapacity, loadFactor, concurrencyLevel); + public ResultAggr(int initialCapacity, float loadFactor) { + super(initialCapacity, loadFactor); } public Map toSorted() { @@ -326,9 +341,9 @@ protected static class Partition implements Runnable { private final MemorySegment data; private long offset; private final long limit; - private final ResultAggr result; + private final PartitionAggr result; - public Partition(MemorySegment data, long offset, long limit, ResultAggr result) { + public Partition(MemorySegment data, long offset, long limit, PartitionAggr result) { this.data = data; this.offset = offset; this.limit = limit; @@ -338,7 +353,7 @@ public Partition(MemorySegment data, long offset, long limit, ResultAggr result) @Override public void run() { // measurement parsing - PartitionAggr aggr = new PartitionAggr(); + final PartitionAggr aggr = this.result; // main loop (vectorized) final long loopLimit = limit - (VectorUtils.BYTE_SPECIES.length() * Math.ceilDiv(100, VectorUtils.BYTE_SPECIES.length()) + Long.BYTES); @@ -402,7 +417,7 @@ public void run() { } // measurement result collection - aggr.mergeTo(result); + // aggr.mergeTo(result); } } @@ -435,15 +450,25 @@ public static void main(String[] args) throws IOException, InterruptedException // partition aggregation var threadList = new Thread[processors]; - ResultAggr result = new ResultAggr(1 << 14, 1, processors); + PartitionAggr[] partAggrs = new PartitionAggr[processors]; for (int i = 0; i < processors; i++) { - threadList[i] = new Thread(new Partition(data, partition[i], partition[i + 1], result)); + if (partition[i] == data.byteSize()) + break; + + partAggrs[i] = new PartitionAggr(); + threadList[i] = new Thread(new Partition(data, partition[i], partition[i + 1], partAggrs[i])); threadList[i].start(); } - for (var thread : threadList) { - thread.join(); - } + // result + ResultAggr result = new ResultAggr(1 << 14, 1); + for (int i = 0; i < processors; i++) { + if (partition[i] == data.byteSize()) + break; + + threadList[i].join(); + partAggrs[i].mergeTo(result); + } System.out.println(result.toSorted()); }