Skip to content

Commit 919bf32

Browse files
committed
[SPARK-13325][SQL] Create a 64-bit hashcode expression
This PR introduces a 64-bit hashcode expression. Such an expression is especially usefull for HyperLogLog++ and other probabilistic datastructures. I have implemented xxHash64 which is a 64-bit hashing algorithm created by Yann Colet and Mathias Westerdahl. This is a high speed (C implementation runs at memory bandwidth) and high quality hashcode. It exploits both Instruction Level Parralellism (for speed) and the multiplication and rotation techniques (for quality) like MurMurHash does. The initial results are promising. I have added a CG'ed test to the `HashBenchmark`, and this results in the following results (running from SBT): Running benchmark: Hash For simple Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For simple: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1011 / 1016 132.8 7.5 1.0X codegen version 1864 / 1869 72.0 13.9 0.5X codegen version 64-bit 1614 / 1644 83.2 12.0 0.6X Running benchmark: Hash For normal Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For normal: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 2467 / 2475 0.9 1176.1 1.0X codegen version 2008 / 2115 1.0 957.5 1.2X codegen version 64-bit 728 / 758 2.9 347.0 3.4X Running benchmark: Hash For array Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1544 / 1707 0.1 11779.6 1.0X codegen version 2728 / 2745 0.0 20815.5 0.6X codegen version 64-bit 2508 / 2549 0.1 19132.8 0.6X Running benchmark: Hash For map Running case: interpreted version Running case: codegen version Running case: codegen version 64-bit Intel(R) Core(TM) i7-4750HQ CPU 2.00GHz Hash For map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- interpreted version 1819 / 1826 0.0 444014.3 1.0X codegen version 183 / 194 0.0 44642.9 9.9X codegen version 64-bit 173 / 174 0.0 42120.9 10.5X This shows that algorithm is consistently faster than MurMurHash32 in all cases and up to 3x (!) in the normal case. I have also added this to HyperLogLog++ and it cuts the processing time of the following code in half: val df = sqlContext.range(1<<25).agg(approxCountDistinct("id")) df.explain() val t = System.nanoTime() df.show() val ns = System.nanoTime() - t // Before ns: Long = 5821524302 // After ns: Long = 2836418963 cc cloud-fan (you have been working on hashcodes) / rxin Author: Herman van Hovell <[email protected]> Closes #11209 from hvanhovell/xxHash.
1 parent 8c82688 commit 919bf32

File tree

7 files changed

+713
-110
lines changed

7 files changed

+713
-110
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.catalyst.expressions;
18+
19+
import org.apache.spark.unsafe.Platform;
20+
import org.apache.spark.util.SystemClock;
21+
22+
// scalastyle: off
23+
24+
/**
25+
* xxHash64. A high quality and fast 64 bit hash code by Yann Colet and Mathias Westerdahl. The
26+
* class below is modelled like its Murmur3_x86_32 cousin.
27+
* <p/>
28+
* This was largely based on the following (original) C and Java implementations:
29+
* https://github.com/Cyan4973/xxHash/blob/master/xxhash.c
30+
* https://github.com/OpenHFT/Zero-Allocation-Hashing/blob/master/src/main/java/net/openhft/hashing/XxHash_r39.java
31+
* https://github.com/airlift/slice/blob/master/src/main/java/io/airlift/slice/XxHash64.java
32+
*/
33+
// scalastyle: on
34+
public final class XXH64 {
35+
36+
private static final long PRIME64_1 = 0x9E3779B185EBCA87L;
37+
private static final long PRIME64_2 = 0xC2B2AE3D27D4EB4FL;
38+
private static final long PRIME64_3 = 0x165667B19E3779F9L;
39+
private static final long PRIME64_4 = 0x85EBCA77C2B2AE63L;
40+
private static final long PRIME64_5 = 0x27D4EB2F165667C5L;
41+
42+
private final long seed;
43+
44+
public XXH64(long seed) {
45+
super();
46+
this.seed = seed;
47+
}
48+
49+
@Override
50+
public String toString() {
51+
return "xxHash64(seed=" + seed + ")";
52+
}
53+
54+
public long hashInt(int input) {
55+
return hashInt(input, seed);
56+
}
57+
58+
public static long hashInt(int input, long seed) {
59+
long hash = seed + PRIME64_5 + 4L;
60+
hash ^= (input & 0xFFFFFFFFL) * PRIME64_1;
61+
hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
62+
return fmix(hash);
63+
}
64+
65+
public long hashLong(long input) {
66+
return hashLong(input, seed);
67+
}
68+
69+
public static long hashLong(long input, long seed) {
70+
long hash = seed + PRIME64_5 + 8L;
71+
hash ^= Long.rotateLeft(input * PRIME64_2, 31) * PRIME64_1;
72+
hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
73+
return fmix(hash);
74+
}
75+
76+
public long hashUnsafeWords(Object base, long offset, int length) {
77+
return hashUnsafeWords(base, offset, length, seed);
78+
}
79+
80+
public static long hashUnsafeWords(Object base, long offset, int length, long seed) {
81+
assert (length % 8 == 0) : "lengthInBytes must be a multiple of 8 (word-aligned)";
82+
long hash = hashBytesByWords(base, offset, length, seed);
83+
return fmix(hash);
84+
}
85+
86+
public long hashUnsafeBytes(Object base, long offset, int length) {
87+
return hashUnsafeBytes(base, offset, length, seed);
88+
}
89+
90+
public static long hashUnsafeBytes(Object base, long offset, int length, long seed) {
91+
assert (length >= 0) : "lengthInBytes cannot be negative";
92+
long hash = hashBytesByWords(base, offset, length, seed);
93+
long end = offset + length;
94+
offset += length & -8;
95+
96+
if (offset + 4L <= end) {
97+
hash ^= (Platform.getInt(base, offset) & 0xFFFFFFFFL) * PRIME64_1;
98+
hash = Long.rotateLeft(hash, 23) * PRIME64_2 + PRIME64_3;
99+
offset += 4L;
100+
}
101+
102+
while (offset < end) {
103+
hash ^= (Platform.getByte(base, offset) & 0xFFL) * PRIME64_5;
104+
hash = Long.rotateLeft(hash, 11) * PRIME64_1;
105+
offset++;
106+
}
107+
return fmix(hash);
108+
}
109+
110+
private static long fmix(long hash) {
111+
hash ^= hash >>> 33;
112+
hash *= PRIME64_2;
113+
hash ^= hash >>> 29;
114+
hash *= PRIME64_3;
115+
hash ^= hash >>> 32;
116+
return hash;
117+
}
118+
119+
private static long hashBytesByWords(Object base, long offset, int length, long seed) {
120+
long end = offset + length;
121+
long hash;
122+
if (length >= 32) {
123+
long limit = end - 32;
124+
long v1 = seed + PRIME64_1 + PRIME64_2;
125+
long v2 = seed + PRIME64_2;
126+
long v3 = seed;
127+
long v4 = seed - PRIME64_1;
128+
129+
do {
130+
v1 += Platform.getLong(base, offset) * PRIME64_2;
131+
v1 = Long.rotateLeft(v1, 31);
132+
v1 *= PRIME64_1;
133+
134+
v2 += Platform.getLong(base, offset + 8) * PRIME64_2;
135+
v2 = Long.rotateLeft(v2, 31);
136+
v2 *= PRIME64_1;
137+
138+
v3 += Platform.getLong(base, offset + 16) * PRIME64_2;
139+
v3 = Long.rotateLeft(v3, 31);
140+
v3 *= PRIME64_1;
141+
142+
v4 += Platform.getLong(base, offset + 24) * PRIME64_2;
143+
v4 = Long.rotateLeft(v4, 31);
144+
v4 *= PRIME64_1;
145+
146+
offset += 32L;
147+
} while (offset <= limit);
148+
149+
hash = Long.rotateLeft(v1, 1)
150+
+ Long.rotateLeft(v2, 7)
151+
+ Long.rotateLeft(v3, 12)
152+
+ Long.rotateLeft(v4, 18);
153+
154+
v1 *= PRIME64_2;
155+
v1 = Long.rotateLeft(v1, 31);
156+
v1 *= PRIME64_1;
157+
hash ^= v1;
158+
hash = hash * PRIME64_1 + PRIME64_4;
159+
160+
v2 *= PRIME64_2;
161+
v2 = Long.rotateLeft(v2, 31);
162+
v2 *= PRIME64_1;
163+
hash ^= v2;
164+
hash = hash * PRIME64_1 + PRIME64_4;
165+
166+
v3 *= PRIME64_2;
167+
v3 = Long.rotateLeft(v3, 31);
168+
v3 *= PRIME64_1;
169+
hash ^= v3;
170+
hash = hash * PRIME64_1 + PRIME64_4;
171+
172+
v4 *= PRIME64_2;
173+
v4 = Long.rotateLeft(v4, 31);
174+
v4 *= PRIME64_1;
175+
hash ^= v4;
176+
hash = hash * PRIME64_1 + PRIME64_4;
177+
} else {
178+
hash = seed + PRIME64_5;
179+
}
180+
181+
hash += length;
182+
183+
long limit = end - 8;
184+
while (offset <= limit) {
185+
long k1 = Platform.getLong(base, offset);
186+
hash ^= Long.rotateLeft(k1 * PRIME64_2, 31) * PRIME64_1;
187+
hash = Long.rotateLeft(hash, 27) * PRIME64_1 + PRIME64_4;
188+
offset += 8L;
189+
}
190+
return hash;
191+
}
192+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ case class HyperLogLogPlusPlus(
169169
val v = child.eval(input)
170170
if (v != null) {
171171
// Create the hashed value 'x'.
172-
val x = MurmurHash.hash64(v)
172+
val x = XxHash64Function.hash(v, child.dataType, 42L)
173173

174174
// Determine the index of the register we are going to use.
175175
val idx = (x >>> idxShift).toInt

0 commit comments

Comments
 (0)