Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package org.elasticsearch.search.aggregations.metrics;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LongBitSet;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.IntArray;
Expand Down Expand Up @@ -63,7 +63,7 @@ public final class HyperLogLogPlusPlus implements Releasable {
private static final boolean HYPERLOGLOG = true;
public static final int DEFAULT_PRECISION = 14;

private final OpenBitSet algorithm;
private final BitArray algorithm;
private final HyperLogLog hll;
private final LinearCounting lc;

Expand All @@ -89,7 +89,7 @@ public static long memoryUsage(int precision) {
public HyperLogLogPlusPlus(int precision, BigArrays bigArrays, long initialBucketCount) {
hll = new HyperLogLog(bigArrays, initialBucketCount, precision);
lc = new LinearCounting(bigArrays, initialBucketCount, precision, hll);
algorithm = new OpenBitSet();
algorithm = new BitArray(1, bigArrays);
}

public int precision() {
Expand All @@ -104,15 +104,23 @@ public void merge(long thisBucket, HyperLogLogPlusPlus other, long otherBucket)
if (precision() != other.precision()) {
throw new IllegalArgumentException();
}
if (thisBucket > Integer.MAX_VALUE) {
// This many buckets would take TB of memory anyway.
throw new UnsupportedOperationException("Cardinality only supports collecting up to [" + Integer.MAX_VALUE + "] buckets.");
}
if (otherBucket > Integer.MAX_VALUE) {
// This many buckets would take TB of memory anyway.
throw new UnsupportedOperationException("Cardinality only supports collecting up to [" + Integer.MAX_VALUE + "] buckets.");
}
hll.bucket = thisBucket;
lc.bucket = thisBucket;
hll.ensureCapacity(thisBucket + 1);
if (other.algorithm.get(otherBucket) == LINEAR_COUNTING) {
if (other.algorithm.get((int) otherBucket) == LINEAR_COUNTING) {
other.lc.bucket = otherBucket;
final AbstractLinearCounting.HashesIterator values = other.lc.values();
while (values.next()) {
final int encoded = values.value();
if (algorithm.get(thisBucket) == LINEAR_COUNTING) {
if (algorithm.get((int) thisBucket) == LINEAR_COUNTING) {
final int newSize = lc.addEncoded(encoded);
if (newSize > lc.threshold) {
upgradeToHll(thisBucket);
Expand All @@ -122,7 +130,7 @@ public void merge(long thisBucket, HyperLogLogPlusPlus other, long otherBucket)
}
}
} else {
if (algorithm.get(thisBucket) != HYPERLOGLOG) {
if (algorithm.get((int) thisBucket) != HYPERLOGLOG) {
upgradeToHll(thisBucket);
}
other.hll.bucket = otherBucket;
Expand All @@ -131,8 +139,12 @@ public void merge(long thisBucket, HyperLogLogPlusPlus other, long otherBucket)
}

public void collect(long bucket, long hash) {
if (bucket > Integer.MAX_VALUE) {
// This many buckets would take TB of memory anyway.
throw new UnsupportedOperationException("Cardinality only supports collecting up to [" + Integer.MAX_VALUE + "] buckets.");
}
hll.ensureCapacity(bucket + 1);
if (algorithm.get(bucket) == LINEAR_COUNTING) {
if (algorithm.get((int) bucket) == LINEAR_COUNTING) {
lc.bucket = bucket;
final int newSize = lc.collect(hash);
if (newSize > lc.threshold) {
Expand All @@ -145,7 +157,11 @@ public void collect(long bucket, long hash) {
}

public long cardinality(long bucket) {
if (algorithm.get(bucket) == LINEAR_COUNTING) {
if (bucket > Integer.MAX_VALUE) {
// This many buckets would take TB of memory anyway.
throw new UnsupportedOperationException("Cardinality only supports collecting up to [" + Integer.MAX_VALUE + "] buckets.");
}
if (algorithm.get((int) bucket) == LINEAR_COUNTING) {
lc.bucket = bucket;
return lc.cardinality();
} else {
Expand Down Expand Up @@ -173,19 +189,19 @@ void upgradeToHll(long bucket) {
final int encoded = values.get(j);
hll.collectEncoded(encoded);
}
algorithm.set(bucket);
algorithm.set((int) bucket);
} finally {
Releasables.close(values);
}
}

@Override
public void close() {
Releasables.close(hll, lc);
Releasables.close(algorithm, hll, lc);
}

private Object getComparableData(long bucket) {
if (algorithm.get(bucket) == LINEAR_COUNTING) {
if (algorithm.get((int) bucket) == LINEAR_COUNTING) {
lc.bucket = bucket;
return lc.getComparableData();
} else {
Expand All @@ -195,18 +211,18 @@ private Object getComparableData(long bucket) {
}

public int hashCode(long bucket) {
return Objects.hash(precision(), algorithm.get(bucket), getComparableData(bucket));
return Objects.hash(precision(), algorithm.get((int) bucket), getComparableData(bucket));
}

public boolean equals(long bucket, HyperLogLogPlusPlus other) {
return Objects.equals(precision(), other.precision())
&& Objects.equals(algorithm.get(bucket), other.algorithm.get(bucket))
&& Objects.equals(algorithm.get((int) bucket), other.algorithm.get((int) bucket))
&& Objects.equals(getComparableData(bucket), other.getComparableData(bucket));
}

public void writeTo(long bucket, StreamOutput out) throws IOException {
out.writeVInt(precision());
if (algorithm.get(bucket) == LINEAR_COUNTING) {
if (algorithm.get((int) bucket) == LINEAR_COUNTING) {
out.writeBoolean(LINEAR_COUNTING);
lc.bucket = bucket;
AbstractLinearCounting.HashesIterator hashes = lc.values();
Expand Down Expand Up @@ -485,31 +501,4 @@ public int value() {
return value;
}
}

/** looks and smells like the old openbitset. */
static class OpenBitSet {
LongBitSet impl = new LongBitSet(64);

boolean get(long bit) {
if (bit < impl.length()) {
return impl.get(bit);
} else {
return false;
}
}

void ensureCapacity(long bit) {
impl = LongBitSet.ensureCapacity(impl, bit);
}

void set(long bit) {
ensureCapacity(bit);
impl.set(bit);
}

void clear(long bit) {
ensureCapacity(bit);
impl.clear(bit);
}
}
}