Skip to content
Closed
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3c5a843
SPARK-47547 BloomFilter fpp degradation: addressing the int32 truncation
ishnagy May 12, 2025
08cbfeb
SPARK-47547 BloomFilter fpp degradation: fixing test data repetition …
ishnagy May 13, 2025
e3cb08e
SPARK-47547 BloomFilter fpp degradation: scrambling the high 32bytes …
ishnagy May 13, 2025
c4e3f58
SPARK-47547 BloomFilter fpp degradation: random distribution fpp test
ishnagy May 13, 2025
1a0b66f
SPARK-47547 BloomFilter fpp degradation: javadoc for test methods, ch…
ishnagy May 19, 2025
d912b66
SPARK-47547 BloomFilter fpp degradation: make seed serialization back…
ishnagy May 19, 2025
f589e2c
SPARK-47547 BloomFilter fpp degradation: counting discarded odd items…
ishnagy May 19, 2025
f597c76
SPARK-47547 BloomFilter fpp degradation: refactoring FPP counting log…
ishnagy May 19, 2025
4ea633d
SPARK-47547 BloomFilter fpp degradation: checkstyle fix
ishnagy May 19, 2025
6696106
SPARK-47547 BloomFilter fpp degradation: fix test bug
ishnagy May 21, 2025
b75e187
SPARK-47547 BloomFilter fpp degradation: parallelization friendly tes…
ishnagy May 26, 2025
2d8a9f1
SPARK-47547 BloomFilter fpp degradation: parallelization friendly tes…
ishnagy May 26, 2025
4a30794
SPARK-47547 BloomFilter fpp degradation: parallelization friendly tes…
ishnagy May 26, 2025
d9d6980
SPARK-47547 BloomFilter fpp degradation: addressing concerns around d…
ishnagy Jun 16, 2025
39a46c9
SPARK-47547 BloomFilter fpp degradation: cut down test cases to decre…
ishnagy Jun 17, 2025
7f235e7
Merge branch 'master' into SPARK-47547_bloomfilter_fpp_degradation
ishnagy Jun 17, 2025
16be3a9
SPARK-47547 BloomFilter fpp degradation: revert creating a new SlowTe…
ishnagy Jun 17, 2025
e91b5ca
SPARK-47547 BloomFilter fpp degradation: disable progress logging by …
ishnagy Jun 17, 2025
897c1d4
SPARK-47547 BloomFilter fpp degradation: adjust tolerance and fail on…
ishnagy Jun 18, 2025
013bfe4
SPARK-47547 BloomFilter fpp degradation: make V1/V2 distinction in Bl…
ishnagy Jul 6, 2025
6d44c1e
SPARK-47547 BloomFilter fpp degradation: scrambling test input withou…
ishnagy Jul 6, 2025
925bf12
SPARK-47547 BloomFilter fpp degradation: parallelizing BloomFilter re…
ishnagy Jul 6, 2025
6f28882
SPARK-47547 BloomFilter fpp degradation: add seed to equals/hashCode
ishnagy Jul 7, 2025
ed6caac
SPARK-47547 BloomFilter fpp degradation: checkstyle fix
ishnagy Jul 7, 2025
7d4ef74
SPARK-47547 BloomFilter fpp degradation: remove dependency between lo…
ishnagy Jul 7, 2025
c52ead3
Merge branch 'master' into SPARK-47547_bloomfilter_fpp_degradation
ishnagy Jul 7, 2025
0ab8276
SPARK-47547 BloomFilter fpp degradation: running /dev/scalafmt
ishnagy Jul 7, 2025
d2477bf
SPARK-47547 BloomFilter fpp degradation: javadoc comment for the V2 enum
ishnagy Jul 7, 2025
413c4fe
SPARK-47547 BloomFilter fpp degradation: reindent with 2 spaces
ishnagy Jul 7, 2025
4599fcb
SPARK-47547 BloomFilter fpp degradation: (recover empty line in Bloom…
ishnagy Jul 7, 2025
1ee2e13
SPARK-47547 BloomFilter fpp degradation: JEP-361 style switches
ishnagy Jul 7, 2025
c501b2a
SPARK-47547 BloomFilter fpp degradation: removing Objects::equals
ishnagy Jul 7, 2025
1f5cfb6
SPARK-47547 BloomFilter fpp degradation: add missing seed comparison …
ishnagy Jul 7, 2025
f60d55f
SPARK-47547 BloomFilter fpp degradation: checkstyle
ishnagy Jul 8, 2025
0314963
SPARK-47547 BloomFilter fpp degradation: BloomFilterBase abstract par…
ishnagy Jul 11, 2025
f2df338
SPARK-47547 BloomFilter fpp degradation: pull up long and byte hashin…
ishnagy Jul 11, 2025
4aaff83
SPARK-47547 BloomFilter fpp degradation: checkstyle
ishnagy Jul 13, 2025
e214bd7
SPARK-47547 BloomFilter fpp degradation: removing unnecessary line wr…
ishnagy Jul 15, 2025
99f7343
SPARK-47547 BloomFilter fpp degradation: moving junit-pioneer version…
ishnagy Jul 15, 2025
58e3066
SPARK-47547 BloomFilter fpp degradation: (empty line juggling)
ishnagy Jul 15, 2025
c06cb38
SPARK-47547 BloomFilter fpp degradation: pull up common hash scatteri…
ishnagy Jul 15, 2025
b99ef3a
SPARK-47547 BloomFilter fpp degradation: (empty line juggling)
ishnagy Jul 15, 2025
ce3ad76
SPARK-47547 BloomFilter fpp degradation: remove redundant default cas…
ishnagy Jul 15, 2025
626e459
SPARK-47547 BloomFilter fpp degradation: properly capitalize InputStr…
ishnagy Jul 15, 2025
b0f5b45
SPARK-47547 BloomFilter fpp degradation: indenting method parameters …
ishnagy Jul 15, 2025
6849dbe
SPARK-47547 BloomFilter fpp degradation: removing junit-pioneer from …
ishnagy Jul 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/sketch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit-pioneer</groupId>
<artifactId>junit-pioneer</artifactId>
<version>2.3.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the management of dependency versions, they should be placed in the parent pom.xml. However, if TestSparkBloomFilter can be removed from the current pr, then it seems that this dependency is no longer needed either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer addressing this, until we decide what should happen with TestSparkBloomFilter.
(remove & move the versions under managed dependencies)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the dependency then please move the version to the main pom.

<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.util.sketch;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
Expand All @@ -42,6 +45,7 @@
public abstract class BloomFilter {

public enum Version {

/**
* {@code BloomFilter} binary format version 1. All values written in big-endian order:
* <ul>
Expand All @@ -51,7 +55,22 @@ public enum Version {
* <li>The words/longs (numWords * 64 bit)</li>
* </ul>
*/
V1(1);
V1(1),

/**
* {@code BloomFilter} binary format version 2.
* Fixes the int32 truncation issue with V1 indexes, but by changing the bit pattern,
* it will become incompatible with V1 serializations.
* All values written in big-endian order:
* <ul>
* <li>Version number, always 2 (32 bit)</li>
* <li>Number of hash functions (32 bit)</li>
* <li>Integer seed to initialize hash functions (32 bit) </li>
* <li>Total number of words of the underlying bit array (32 bit)</li>
* <li>The words/longs (numWords * 64 bit)</li>
* </ul>
*/
V2(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to add V2, we need to add a new comment block for V2 because the above comment is for V1 only.

* <li>Version number, always 1 (32 bit)</li>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments in d2477bf


private final int versionNumber;

Expand Down Expand Up @@ -175,14 +194,26 @@ public long cardinality() {
* the stream.
*/
public static BloomFilter readFrom(InputStream in) throws IOException {
return BloomFilterImpl.readFrom(in);
// peek into the inputstream so we can determine the version
BufferedInputStream bin = new BufferedInputStream(in);
bin.mark(4);
int version = ByteBuffer.wrap(bin.readNBytes(4)).getInt();
bin.reset();

return switch (version) {
case 1 -> BloomFilterImpl.readFrom(bin);
case 2 -> BloomFilterImplV2.readFrom(bin);
default -> throw new IllegalArgumentException("Unknown BloomFilter version: " + version);
};
}

/**
* Reads in a {@link BloomFilter} from a byte array.
*/
public static BloomFilter readFrom(byte[] bytes) throws IOException {
return BloomFilterImpl.readFrom(bytes);
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
return readFrom(bis);
}
}

/**
Expand Down Expand Up @@ -247,15 +278,28 @@ public static BloomFilter create(long expectedNumItems, double fpp) {
"False positive probability must be within range (0.0, 1.0)"
);
}

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please recover this irrelevant removal.

We recommend not to fix a style change and the functional change. For example, even inside this PR, this removal is inconsistent from your code style because this PR added a new empty line at line 199 before return statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 4599fcb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't look good and there is an extra blank line below.

return create(expectedNumItems, optimalNumOfBits(expectedNumItems, fpp));
}


/**
* Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code numBits}, it will
* pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
*/
public static BloomFilter create(long expectedNumItems, long numBits) {
return create(Version.V2, expectedNumItems, numBits, BloomFilterImplV2.DEFAULT_SEED);
}

public static BloomFilter create(long expectedNumItems, long numBits, int seed) {
return create(Version.V2, expectedNumItems, numBits, seed);
}

public static BloomFilter create(
Version version,
long expectedNumItems,
long numBits,
int seed
) {
if (expectedNumItems <= 0) {
throw new IllegalArgumentException("Expected insertions must be positive");
}
Expand All @@ -264,6 +308,12 @@ public static BloomFilter create(long expectedNumItems, long numBits) {
throw new IllegalArgumentException("Number of bits must be positive");
}

return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);

return switch (version) {
case V1 -> new BloomFilterImpl(numHashFunctions, numBits);
case V2 -> new BloomFilterImplV2(numHashFunctions, numBits, seed);
default -> throw new IllegalArgumentException("Unknown BloomFilter version: " + version);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.sketch;

import java.util.Objects;

abstract class BloomFilterBase extends BloomFilter {

public static final int DEFAULT_SEED = 0;

protected int seed;
protected int numHashFunctions;
protected BitArray bits;

protected BloomFilterBase(int numHashFunctions, long numBits) {
this(numHashFunctions, numBits, DEFAULT_SEED);
}

protected BloomFilterBase(int numHashFunctions, long numBits, int seed) {
this(new BitArray(numBits), numHashFunctions, seed);
}

protected BloomFilterBase(BitArray bits, int numHashFunctions, int seed) {
this.bits = bits;
this.numHashFunctions = numHashFunctions;
this.seed = seed;
}

protected BloomFilterBase() {}

@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}

if (!(other instanceof BloomFilterBase that)) {
return false;
}

return
this.getClass() == that.getClass()
&& this.numHashFunctions == that.numHashFunctions
&& this.seed == that.seed
// TODO: this.bits can be null temporarily, during deserialization,
// should we worry about this?
&& this.bits.equals(that.bits);
}

@Override
public int hashCode() {
return Objects.hash(numHashFunctions, seed, bits);
}

@Override
public double expectedFpp() {
return Math.pow((double) bits.cardinality() / bits.bitSize(), numHashFunctions);
}

@Override
public long bitSize() {
return bits.bitSize();
}

@Override
public boolean put(Object item) {
if (item instanceof String str) {
return putString(str);
} else if (item instanceof byte[] bytes) {
return putBinary(bytes);
} else {
return putLong(Utils.integralToLong(item));
}
}

@Override
public boolean putString(String item) {
return putBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public abstract boolean putBinary(byte[] item);

@Override
public boolean mightContainString(String item) {
return mightContainBinary(Utils.getBytesFromUTF8String(item));
}

@Override
public abstract boolean mightContainBinary(byte[] item) ;

@Override
public abstract boolean putLong(long item);

@Override
public abstract boolean mightContainLong(long item);

@Override
public boolean mightContain(Object item) {
if (item instanceof String str) {
return mightContainString(str);
} else if (item instanceof byte[] bytes) {
return mightContainBinary(bytes);
} else {
return mightContainLong(Utils.integralToLong(item));
}
}

@Override
public boolean isCompatible(BloomFilter other) {
if (other == null) {
return false;
}

if (!(other instanceof BloomFilterBase that)) {
return false;
}

return
this.getClass() == that.getClass()
&& this.bitSize() == that.bitSize()
&& this.numHashFunctions == that.numHashFunctions
&& this.seed == that.seed;
}

@Override
public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterBase otherImplInstance = checkCompatibilityForMerge(other);

this.bits.putAll(otherImplInstance.bits);
return this;
}

@Override
public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeException {
BloomFilterBase otherImplInstance = checkCompatibilityForMerge(other);

this.bits.and(otherImplInstance.bits);
return this;
}

@Override
public long cardinality() {
return this.bits.cardinality();
}

protected abstract BloomFilterBase checkCompatibilityForMerge(BloomFilter other)
throws IncompatibleMergeException;

public record HiLoHash(int hi, int lo) {}

protected HiLoHash hashLongToIntPair(long item, int seed) {
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
// every i to produce n hash values.
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
int h1 = Murmur3_x86_32.hashLong(item, seed);
int h2 = Murmur3_x86_32.hashLong(item, h1);
return new HiLoHash(h1, h2);
}

protected HiLoHash hashBytesToIntPair(byte[] item, int seed) {
int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, seed);
int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
return new HiLoHash(h1, h2);
}

}
Loading