Skip to content

Commit a08d8b0

Browse files
ishnagypeter-toth
authored andcommitted
[SPARK-47547][CORE] Add BloomFilter V2 and use it as default
### What changes were proposed in this pull request? This change fixes a performance degradation issue in the current BloomFilter implementation. The current bit index calculation logic does not use any part of the indexable space above the first 31bits, so when the inserted item count approaches (or exceeds) Integer.MAX_VALUE, it will produce significantly worse collision rates than an (ideal) uniformly distributing hash function. ### Why are the changes needed? This should qualify as a bug. The upper bound on the bit capacity of the current BloomFilter implementation in spark is approx 137G bits (64 bit longs in an Integer.MAX_VALUE sized array). The current indexing scheme can only address about 2G bits of these. On the other hand, due to the way the BloomFilters are used, the bug won't cause any logical errors, it will gradually render the BloomFilter instance useless by forcing more-and-more queries on the slow path. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? #### new test One new java testclass was added to `sketch` to test different combinations of item counts and expected fpp rates. ``` common/sketch/src/test/java/org/apache/spark/util/sketch/TestSparkBloomFilter.java ``` `testAccuracyEvenOdd` in N number of iterations inserts N even numbers (2\*i), and leaves out N odd numbers (2\*i+1) from the BloomFilter. The test checks the 100% accuracy of `mightContain=true` on all of the even items, and measures the `mightContain=true` (false positive) rate on the not-inserted odd numbers. `testAccuracyRandom` in 2N number of iterations inserts N pseudorandomly generated numbers in two differently seeded (theoretically independent) BloomFilter instances. All the random numbers generated in an even-iteration will be inserted into both filters, all the random numbers generated in an odd-iteration will be left out from both. The test checks the 100% accuracy of `mightContain=true` for all of the items inserted in an even-loop. It counts the false positives as the number of odd-loop items for which the primary filter reports `mightContain=true` but secondary reports `mightContain=false`. Since we inserted the same elements into both instances, and the secondary reports non-insertion, the `mightContain=true` from the primary can only be a false positive. #### patched One minor (test) issue was fixed in ``` common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala ``` where the potential repetitions in the randomly generated stream of insertable items resulted in slightly worse fpp measurements than the actual. The problem affected the those testcases more where the cardinality of the tested type is low (the chance of repetition is high), e.g. Byte and Short. ### Was this patch authored or co-authored using generative AI tooling? No Closes #50933 from ishnagy/SPARK-47547_bloomfilter_fpp_degradation. Authored-by: Ish Nagy <[email protected]> Signed-off-by: Peter Toth <[email protected]>
1 parent 23a19e6 commit a08d8b0

File tree

7 files changed

+916
-278
lines changed

7 files changed

+916
-278
lines changed

common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.spark.util.sketch;
1919

20+
import java.io.BufferedInputStream;
21+
import java.io.ByteArrayInputStream;
2022
import java.io.IOException;
2123
import java.io.InputStream;
2224
import java.io.OutputStream;
25+
import java.nio.ByteBuffer;
2326

2427
/**
2528
* A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
@@ -51,7 +54,22 @@ public enum Version {
5154
* <li>The words/longs (numWords * 64 bit)</li>
5255
* </ul>
5356
*/
54-
V1(1);
57+
V1(1),
58+
59+
/**
60+
* {@code BloomFilter} binary format version 2.
61+
* Fixes the int32 truncation issue with V1 indexes, but by changing the bit pattern,
62+
* it will become incompatible with V1 serializations.
63+
* All values written in big-endian order:
64+
* <ul>
65+
* <li>Version number, always 2 (32 bit)</li>
66+
* <li>Number of hash functions (32 bit)</li>
67+
* <li>Integer seed to initialize hash functions (32 bit) </li>
68+
* <li>Total number of words of the underlying bit array (32 bit)</li>
69+
* <li>The words/longs (numWords * 64 bit)</li>
70+
* </ul>
71+
*/
72+
V2(2);
5573

5674
private final int versionNumber;
5775

@@ -175,14 +193,26 @@ public long cardinality() {
175193
* the stream.
176194
*/
177195
public static BloomFilter readFrom(InputStream in) throws IOException {
178-
return BloomFilterImpl.readFrom(in);
196+
// peek into the InputStream so we can determine the version
197+
BufferedInputStream bin = new BufferedInputStream(in);
198+
bin.mark(4);
199+
int version = ByteBuffer.wrap(bin.readNBytes(4)).getInt();
200+
bin.reset();
201+
202+
return switch (version) {
203+
case 1 -> BloomFilterImpl.readFrom(bin);
204+
case 2 -> BloomFilterImplV2.readFrom(bin);
205+
default -> throw new IllegalArgumentException("Unknown BloomFilter version: " + version);
206+
};
179207
}
180208

181209
/**
182210
* Reads in a {@link BloomFilter} from a byte array.
183211
*/
184212
public static BloomFilter readFrom(byte[] bytes) throws IOException {
185-
return BloomFilterImpl.readFrom(bytes);
213+
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
214+
return readFrom(bis);
215+
}
186216
}
187217

188218
/**
@@ -256,6 +286,19 @@ public static BloomFilter create(long expectedNumItems, double fpp) {
256286
* pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
257287
*/
258288
public static BloomFilter create(long expectedNumItems, long numBits) {
289+
return create(Version.V2, expectedNumItems, numBits, BloomFilterImplV2.DEFAULT_SEED);
290+
}
291+
292+
public static BloomFilter create(long expectedNumItems, long numBits, int seed) {
293+
return create(Version.V2, expectedNumItems, numBits, seed);
294+
}
295+
296+
public static BloomFilter create(
297+
Version version,
298+
long expectedNumItems,
299+
long numBits,
300+
int seed
301+
) {
259302
if (expectedNumItems <= 0) {
260303
throw new IllegalArgumentException("Expected insertions must be positive");
261304
}
@@ -264,6 +307,11 @@ public static BloomFilter create(long expectedNumItems, long numBits) {
264307
throw new IllegalArgumentException("Number of bits must be positive");
265308
}
266309

267-
return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
310+
int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);
311+
312+
return switch (version) {
313+
case V1 -> new BloomFilterImpl(numHashFunctions, numBits);
314+
case V2 -> new BloomFilterImplV2(numHashFunctions, numBits, seed);
315+
};
268316
}
269317
}
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.sketch;
19+
20+
import java.util.Objects;
21+
22+
abstract class BloomFilterBase extends BloomFilter {
23+
24+
public static final int DEFAULT_SEED = 0;
25+
26+
protected int seed;
27+
protected int numHashFunctions;
28+
protected BitArray bits;
29+
30+
protected BloomFilterBase(int numHashFunctions, long numBits) {
31+
this(numHashFunctions, numBits, DEFAULT_SEED);
32+
}
33+
34+
protected BloomFilterBase(int numHashFunctions, long numBits, int seed) {
35+
this(new BitArray(numBits), numHashFunctions, seed);
36+
}
37+
38+
protected BloomFilterBase(BitArray bits, int numHashFunctions, int seed) {
39+
this.bits = bits;
40+
this.numHashFunctions = numHashFunctions;
41+
this.seed = seed;
42+
}
43+
44+
protected BloomFilterBase() {}
45+
46+
@Override
47+
public boolean equals(Object other) {
48+
if (other == this) {
49+
return true;
50+
}
51+
52+
if (!(other instanceof BloomFilterBase that)) {
53+
return false;
54+
}
55+
56+
return
57+
this.getClass() == that.getClass()
58+
&& this.numHashFunctions == that.numHashFunctions
59+
&& this.seed == that.seed
60+
// TODO: this.bits can be null temporarily, during deserialization,
61+
// should we worry about this?
62+
&& this.bits.equals(that.bits);
63+
}
64+
65+
@Override
66+
public int hashCode() {
67+
return Objects.hash(numHashFunctions, seed, bits);
68+
}
69+
70+
@Override
71+
public double expectedFpp() {
72+
return Math.pow((double) bits.cardinality() / bits.bitSize(), numHashFunctions);
73+
}
74+
75+
@Override
76+
public long bitSize() {
77+
return bits.bitSize();
78+
}
79+
80+
@Override
81+
public boolean put(Object item) {
82+
if (item instanceof String str) {
83+
return putString(str);
84+
} else if (item instanceof byte[] bytes) {
85+
return putBinary(bytes);
86+
} else {
87+
return putLong(Utils.integralToLong(item));
88+
}
89+
}
90+
91+
protected HiLoHash hashLongToIntPair(long item, int seed) {
92+
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
93+
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
94+
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
95+
// every i to produce n hash values.
96+
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
97+
int h1 = Murmur3_x86_32.hashLong(item, seed);
98+
int h2 = Murmur3_x86_32.hashLong(item, h1);
99+
return new HiLoHash(h1, h2);
100+
}
101+
102+
protected HiLoHash hashBytesToIntPair(byte[] item, int seed) {
103+
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, seed);
104+
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
105+
return new HiLoHash(h1, h2);
106+
}
107+
108+
protected abstract boolean scatterHashAndSetAllBits(HiLoHash inputHash);
109+
110+
protected abstract boolean scatterHashAndGetAllBits(HiLoHash inputHash);
111+
112+
@Override
113+
public boolean putString(String item) {
114+
return putBinary(Utils.getBytesFromUTF8String(item));
115+
}
116+
117+
@Override
118+
public boolean putBinary(byte[] item) {
119+
HiLoHash hiLoHash = hashBytesToIntPair(item, seed);
120+
return scatterHashAndSetAllBits(hiLoHash);
121+
}
122+
123+
@Override
124+
public boolean mightContainString(String item) {
125+
return mightContainBinary(Utils.getBytesFromUTF8String(item));
126+
}
127+
128+
@Override
129+
public boolean mightContainBinary(byte[] item) {
130+
HiLoHash hiLoHash = hashBytesToIntPair(item, seed);
131+
return scatterHashAndGetAllBits(hiLoHash);
132+
}
133+
134+
public boolean putLong(long item) {
135+
HiLoHash hiLoHash = hashLongToIntPair(item, seed);
136+
return scatterHashAndSetAllBits(hiLoHash);
137+
}
138+
139+
@Override
140+
public boolean mightContainLong(long item) {
141+
HiLoHash hiLoHash = hashLongToIntPair(item, seed);
142+
return scatterHashAndGetAllBits(hiLoHash);
143+
}
144+
145+
@Override
146+
public boolean mightContain(Object item) {
147+
if (item instanceof String str) {
148+
return mightContainString(str);
149+
} else if (item instanceof byte[] bytes) {
150+
return mightContainBinary(bytes);
151+
} else {
152+
return mightContainLong(Utils.integralToLong(item));
153+
}
154+
}
155+
156+
@Override
157+
public boolean isCompatible(BloomFilter other) {
158+
if (other == null) {
159+
return false;
160+
}
161+
162+
if (!(other instanceof BloomFilterBase that)) {
163+
return false;
164+
}
165+
166+
return
167+
this.getClass() == that.getClass()
168+
&& this.bitSize() == that.bitSize()
169+
&& this.numHashFunctions == that.numHashFunctions
170+
&& this.seed == that.seed;
171+
}
172+
173+
@Override
174+
public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
175+
BloomFilterBase otherImplInstance = checkCompatibilityForMerge(other);
176+
177+
this.bits.putAll(otherImplInstance.bits);
178+
return this;
179+
}
180+
181+
@Override
182+
public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException {
183+
BloomFilterBase otherImplInstance = checkCompatibilityForMerge(other);
184+
185+
this.bits.and(otherImplInstance.bits);
186+
return this;
187+
}
188+
189+
@Override
190+
public long cardinality() {
191+
return this.bits.cardinality();
192+
}
193+
194+
protected abstract BloomFilterBase checkCompatibilityForMerge(BloomFilter other)
195+
throws IncompatibleMergeException;
196+
197+
public record HiLoHash(int hi, int lo) {}
198+
199+
}

0 commit comments

Comments
 (0)