Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ int getVersionNumber() {
*/
public abstract void writeTo(OutputStream out) throws IOException;

/**
* @return the number of set bits in this {@link BloomFilter}.
*/
public long cardinality() {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Reads in a {@link BloomFilter} from an input stream. It is the caller's responsibility to close
* the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ public BloomFilter intersectInPlace(BloomFilter other) throws IncompatibleMergeE
return this;
}

@Override
public long cardinality() {
return this.bits.cardinality();
}

private BloomFilterImpl checkCompatibilityForMerge(BloomFilter other)
throws IncompatibleMergeException {
// Duplicates the logic of `isCompatible` here to provide better error message.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
import org.apache.spark.sql.types.{BooleanType, DataType, StructType}

/**
* An internal function that returns aggregate operations(min, max and bloom filter) result
* for `structTypeExpression`, min and max results are employed to prune KE segments.
* So this design will only be available for KE, and the related issue is KE-29673.
* Same with the `BloomFilterMightContain` expression, this expression requires that
* `structTypeExpression` is either a constant value or an uncorrelated scalar sub-query.
*
* @param structTypeExpression the struct type including aggregate operations.
* @param valueExpression the application side target column expression.
* @param applicationSideAttrRef the attribute reference for `valueExpression`, this parameter will
* be used to construct `rangeRow` iff `valueExpression` is transformed
* to non AttributeReference type.
*/
case class BloomAndRangeFilterExpression(
structTypeExpression: Expression,
valueExpression: Expression,
applicationSideAttrRef: AttributeReference)
extends BinaryExpression with BloomRuntimeFilterHelper {

val MIN_INDEX = 0
val MAX_INDEX = 1
val BINARY_INDEX = 2

override def nullable: Boolean = true
override def left: Expression = structTypeExpression
override def right: Expression = valueExpression
override def prettyName: String = "bloom_and_range_filter"
override def dataType: DataType = BooleanType
def decoratedRight: Expression = new XxHash64(Seq(right))

override def checkInputDataTypes(): TypeCheckResult = {
left.dataType match {
case StructType(_) =>
structTypeExpression match {
case e : Expression if e.foldable => TypeCheckResult.TypeCheckSuccess
case subquery : PlanExpression[_] if !subquery.containsPattern(OUTER_REFERENCE) =>
TypeCheckResult.TypeCheckSuccess
case _ =>
TypeCheckResult.TypeCheckFailure(
s"The bloom and range filter binary input to $prettyName " +
"should be either a constant value or a scalar sub-query expression")
}
case _ => TypeCheckResult.TypeCheckFailure(
s"Input to function $prettyName should be a StructType, " +
s"which includes aggregate operations for min, max and bloom filter, " +
s"but it's a [${left.dataType.catalogString}]")
}
}

override protected def withNewChildrenInternal(
newStructTypeExpression: Expression,
newValueExpression: Expression): BloomAndRangeFilterExpression =
copy(structTypeExpression = newStructTypeExpression, valueExpression = newValueExpression)

@transient private lazy val subQueryRowResult = {
structTypeExpression.eval().asInstanceOf[UnsafeRow]
}

@transient lazy val rangeRow: Seq[Expression] = {
val structFields = left.dataType.asInstanceOf[StructType].fields
val minDataType = structFields(MIN_INDEX).dataType
val min = subQueryRowResult.get(MIN_INDEX, minDataType)
val maxDataType = structFields(MAX_INDEX).dataType
val max = subQueryRowResult.get(MAX_INDEX, maxDataType)
if(min != null && max != null) {
val attrRef = valueExpression match {
case reference: AttributeReference =>
reference
case _ =>
applicationSideAttrRef
}
val gteExpress = GreaterThanOrEqual(attrRef, Literal(convertToScala(min, minDataType)))
val lteExpress = LessThanOrEqual(attrRef, Literal(convertToScala(max, maxDataType)))
Seq(gteExpress, lteExpress)
} else {
Seq()
}
}

@transient private lazy val bloomFilter = {
val bytes = subQueryRowResult.getBinary(BINARY_INDEX)
if(bytes == null) null else deserialize(bytes)
}

override def eval(input: InternalRow): Any = {
internalEval(input, bloomFilter, decoratedRight)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
internalDoGenCode(ctx, ev, bloomFilter, decoratedRight, dataType)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
import org.apache.spark.sql.types._

/**
* An internal scalar function that returns the membership check result (either true or false)
* for values of `valueExpression` in the Bloom filter represented by `bloomFilterExpression`.
* Not that since the function is "might contain", always returning true regardless is not
* wrong.
* Note that this expression requires that `bloomFilterExpression` is either a constant value or
* an uncorrelated scalar subquery. This is sufficient for the Bloom filter join rewrite.
*
* @param bloomFilterExpression the Binary data of Bloom filter.
* @param valueExpression the Long value to be tested for the membership of `bloomFilterExpression`.
*/
case class BloomFilterMightContain(
bloomFilterExpression: Expression,
valueExpression: Expression) extends BinaryExpression with BloomRuntimeFilterHelper {

override def nullable: Boolean = true
override def left: Expression = bloomFilterExpression
override def right: Expression = valueExpression
override def prettyName: String = "might_contain"
override def dataType: DataType = BooleanType

override def checkInputDataTypes(): TypeCheckResult = {
(left.dataType, right.dataType) match {
case (BinaryType, NullType) | (NullType, LongType) | (NullType, NullType) |
(BinaryType, LongType) =>
bloomFilterExpression match {
case e : Expression if e.foldable => TypeCheckResult.TypeCheckSuccess
case subquery : PlanExpression[_] if !subquery.containsPattern(OUTER_REFERENCE) =>
TypeCheckResult.TypeCheckSuccess
case _ =>
TypeCheckResult.TypeCheckFailure(s"The Bloom filter binary input to $prettyName " +
"should be either a constant value or a scalar subquery expression")
}
case _ => TypeCheckResult.TypeCheckFailure(s"Input to function $prettyName should have " +
s"been ${BinaryType.simpleString} followed by a value with ${LongType.simpleString}, " +
s"but it's [${left.dataType.catalogString}, ${right.dataType.catalogString}].")
}
}

override protected def withNewChildrenInternal(
newBloomFilterExpression: Expression,
newValueExpression: Expression): BloomFilterMightContain =
copy(bloomFilterExpression = newBloomFilterExpression,
valueExpression = newValueExpression)

// The bloom filter created from `bloomFilterExpression`.
@transient private lazy val bloomFilter = {
val bytes = bloomFilterExpression.eval().asInstanceOf[Array[Byte]]
if (bytes == null) null else deserialize(bytes)
}

override def eval(input: InternalRow): Any = {
internalEval(input, bloomFilter, valueExpression)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
internalDoGenCode(ctx, ev, bloomFilter, valueExpression, dataType)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import java.io.ByteArrayInputStream

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, JavaCode, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.sketch.BloomFilter

trait BloomRuntimeFilterHelper {

def internalEval(input: InternalRow, bloomFilter: BloomFilter,
evalExpression: Expression): Any = {
if (bloomFilter == null) {
null
} else {
val value = evalExpression.eval(input)
if (value == null) null else bloomFilter.mightContainLong(value.asInstanceOf[Long])
}
}

def internalDoGenCode(ctx: CodegenContext, ev: ExprCode,
bloomFilter: BloomFilter, evalExpression: Expression, dataType: DataType): ExprCode = {
if (bloomFilter == null) {
ev.copy(isNull = TrueLiteral, value = JavaCode.defaultLiteral(dataType))
} else {
val bf = ctx.addReferenceObj("bloomFilter", bloomFilter, classOf[BloomFilter].getName)
val valueEval = evalExpression.genCode(ctx)
ev.copy(code = code"""
${valueEval.code}
boolean ${ev.isNull} = ${valueEval.isNull};
${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
if (!${ev.isNull}) {
${ev.value} = $bf.mightContainLong((Long)${valueEval.value});
}""")
}
}

def deserialize(bytes: Array[Byte]): BloomFilter = {
val in = new ByteArrayInputStream(bytes)
val bloomFilter = BloomFilter.readFrom(in)
in.close()
bloomFilter
}

}
Loading