Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
a4c6c93
Add SQL config and push filters down to JSON
MaxGekk Jan 24, 2020
ac7c730
Add a test to JsonSuite
MaxGekk Jan 25, 2020
b0ff6c9
Push filters to JacksonParser
MaxGekk Jan 25, 2020
a79dacd
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Jan 25, 2020
bb12fd5
Refactor the test
MaxGekk Jan 25, 2020
ccc0940
Add convertRootObject
MaxGekk Jan 25, 2020
b7f17b1
Add JsonPredicate to JsonFilters
MaxGekk Jan 25, 2020
521a685
Implemented JsonFilters.reset
MaxGekk Jan 25, 2020
a8486bf
Implemented allPredicates
MaxGekk Jan 25, 2020
b0d6939
Add buildPredicates()
MaxGekk Jan 25, 2020
e814986
Simplify buildPredicates()
MaxGekk Jan 26, 2020
c05b1e9
Refactoring buildPredicates()
MaxGekk Jan 26, 2020
15f0390
Pass StructType to JsonFilters
MaxGekk Jan 26, 2020
02aca76
Embed code to indexedPredicates
MaxGekk Jan 26, 2020
bd1d093
Simplify skipRow and reset
MaxGekk Jan 26, 2020
1c64b37
renaming
MaxGekk Jan 26, 2020
f83b93a
Deduplicate code
MaxGekk Jan 26, 2020
dd547ce
Bug fix literals
MaxGekk Jan 26, 2020
0ada227
Adopt test for complex filters to JsonFilters
MaxGekk Jan 26, 2020
f0d6a72
Add JacksonParserSuite
MaxGekk Jan 27, 2020
52e65d0
Add a benchmark
MaxGekk Jan 27, 2020
c60b332
Check spark.sql.json.filterPushdown.enabled in JsonFilters
MaxGekk Jan 27, 2020
617197a
Update benchmark results for JDK 8
MaxGekk Jan 27, 2020
03da0b2
Add comments to StructFilters
MaxGekk Jan 27, 2020
a122fb7
Add comments to JsonFilters
MaxGekk Jan 27, 2020
0aa8499
Add a test for malformed JSON records
MaxGekk Jan 28, 2020
ee53875
Add more cases in JsonSuite
MaxGekk Jan 28, 2020
3381607
Update benchmark results on jdk 11
MaxGekk Jan 28, 2020
144f5a7
Dedup code to toPredicate
MaxGekk Jan 28, 2020
94a22e1
Dedup code in JacksonParser
MaxGekk Jan 28, 2020
5d0ead1
fix coding style
MaxGekk Jan 28, 2020
bd1853c
Bug fix: convert Option to Array explicitly
MaxGekk Jan 28, 2020
330aae7
Remove empty line in JsonBenchmark
MaxGekk Jan 31, 2020
449a7e5
Fix indentation in convertObject
MaxGekk Jan 31, 2020
4527660
Check correct SQL config in JsonScanBuilder
MaxGekk Jan 31, 2020
675682b
Add a test for pushed filters to JsonScanBuilder to JsonSuite
MaxGekk Jan 31, 2020
23191e9
Set default value for filters in JacksonParser
MaxGekk Jan 31, 2020
67a74ad
Fix typo: mep -> map
MaxGekk Jan 31, 2020
4a7f0b0
Remove unused import in CSVScanBuilder.scala
MaxGekk Jan 31, 2020
d9bb50f
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Jan 31, 2020
1230f60
Fix indentation in filterToExpression
MaxGekk Feb 1, 2020
6e0aa47
size -> length
MaxGekk Feb 1, 2020
a455977
Add test "case sensitivity of filters references" to JsonSuite
MaxGekk Feb 1, 2020
942b9a9
Compute set of schema field names only once
MaxGekk Feb 1, 2020
39b4487
Add test "case sensitivity of filters references" to CSVSuite
MaxGekk Feb 1, 2020
e53171b
Regen results of CSVBenchmark and JsonBenchmark on JDK 8
MaxGekk Feb 2, 2020
f4c63fa
Regen results of CSVBenchmark and JsonBenchmark on JDK 11
MaxGekk Feb 2, 2020
bd5b9a9
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Feb 5, 2020
a583247
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Feb 14, 2020
9c76267
Change year pattern for legacy parser: uuuu -> yyyy
MaxGekk Feb 14, 2020
3279fcb
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Feb 28, 2020
e78bacc
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Mar 4, 2020
dc66f82
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Mar 5, 2020
02cd63d
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Mar 6, 2020
443992a
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Mar 26, 2020
648c23b
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Apr 16, 2020
1c4f281
Remove duplicate import in JsonSuite
MaxGekk Apr 16, 2020
0e6ffb5
Re-gen benchmarks results on JDK 8
MaxGekk Apr 17, 2020
01a7ee3
Re-gen JSON and CSV benchmark results on JDK 11
MaxGekk Apr 17, 2020
4e623b3
Filter out not-supported filters
MaxGekk Apr 17, 2020
262e3c7
Merge remote-tracking branch 'origin/master' into json-filters-pushdown
MaxGekk May 21, 2020
f2d0cad
Merge remote-tracking branch 'origin/master' into json-filters-pushdown
MaxGekk May 25, 2020
db1ac35
Re-gen benchmarks on JDK 8
MaxGekk May 26, 2020
4c37c9a
Re-gen benchmarks on JDK 11
MaxGekk May 26, 2020
8bfd599
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk May 29, 2020
e08b6e0
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Jun 19, 2020
9012456
Set version 3.1.0 for the SQL config spark.sql.json.filterPushdown.en…
MaxGekk Jun 21, 2020
31ad92c
Add an assert to `skipRow()`
MaxGekk Jun 21, 2020
50b9bb2
Merge remote-tracking branch 'origin/master' into json-filters-pushdown
MaxGekk Jun 21, 2020
9a8ba45
Replace schema.fieldIndex(attr) by index
MaxGekk Jun 23, 2020
90559de
Remove s"
MaxGekk Jun 23, 2020
38eb601
Add a comment about benchmarks for filters w/ nested column attributes
MaxGekk Jun 23, 2020
0d44c04
Merge remote-tracking branch 'remotes/origin/master' into json-filter…
MaxGekk Jun 28, 2020
e57ebd1
Update JsonBenchmark-jdk11-results.txt
MaxGekk Jul 2, 2020
36412ca
Update JsonBenchmark-results.txt
MaxGekk Jul 2, 2020
b7bdcff
Merge remote-tracking branch 'origin/master' into json-filters-pushdown
MaxGekk Jul 2, 2020
eb79544
Update JsonBenchmark-jdk11-results.txt
MaxGekk Jul 3, 2020
0a133ad
Update JsonBenchmark-results.txt
MaxGekk Jul 3, 2020
d4b88d4
Merge remote-tracking branch 'origin/master' into json-filters-pushdown
MaxGekk Jul 7, 2020
6921415
Simplify `if else`
MaxGekk Jul 7, 2020
0a1e575
Exit earlier from skipRow
MaxGekk Jul 8, 2020
3df60c1
Add a comment for checkFilterRefs
MaxGekk Jul 8, 2020
649d187
Make toRef() private
MaxGekk Jul 9, 2020
2173343
Add comments for JsonFilters
MaxGekk Jul 9, 2020
e55bb50
Use StructFilters.pushedFilters()
MaxGekk Jul 9, 2020
60cd07a
Add a comment for toRef
MaxGekk Jul 9, 2020
8ecede6
can be places -> can be placed
MaxGekk Jul 9, 2020
77bd18e
And -> Or
MaxGekk Jul 9, 2020
864ba7d
Move refCount
MaxGekk Jul 10, 2020
0155b05
Fix comments
MaxGekk Jul 15, 2020
193a57a
Fix comments in StructFilters
MaxGekk Jul 15, 2020
35c056e
Fix comments in CSVFilters
MaxGekk Jul 15, 2020
43f75a6
Fix comments in JsonFilters
MaxGekk Jul 15, 2020
4d5fe2c
`index`` -> `index`
MaxGekk Jul 15, 2020
ba7db8b
Refactoring: adding types
MaxGekk Jul 15, 2020
0ca1417
Add protected to createFilters() in StructFiltersSuite
MaxGekk Jul 15, 2020
18dea26
Fix indentation in comments
MaxGekk Jul 15, 2020
3bf3270
Move common assumption from JsonFilters to StructFilters
MaxGekk Jul 15, 2020
3f7d338
Add tests back
MaxGekk Jul 15, 2020
6938ec5
JIRA for TODO
MaxGekk Jul 15, 2020
fc725bc
`non` -> `not`
MaxGekk Jul 15, 2020
57524d6
Merge remote-tracking branch 'origin/master' into json-filters-pushdown
MaxGekk Jul 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import scala.util.Try

import org.apache.spark.sql.catalyst.StructFilters._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.{BooleanType, StructType}

/**
* The class provides API for applying pushed down filters to partially or
* fully set internal rows that have the struct schema.
*
* `StructFilters` assumes that:
* - `reset()` is called before any `skipRow()` calls for new row.
*
* @param pushedFilters The pushed down source filters. The filters should refer to
* the fields of the provided schema.
* @param schema The required schema of records from datasource files.
*/
abstract class StructFilters(pushedFilters: Seq[sources.Filter], schema: StructType) {

protected val filters = StructFilters.pushedFilters(pushedFilters.toArray, schema)

/**
* Applies pushed down source filters to the given row assuming that
* value at `index` has been already set.
*
* @param row The row with fully or partially set values.
* @param index The index of already set value.
* @return `true` if currently processed row can be skipped otherwise false.
*/
def skipRow(row: InternalRow, index: Int): Boolean

/**
* Resets states of pushed down filters. The method must be called before
* precessing any new row otherwise `skipRow()` may return wrong result.
*/
def reset(): Unit

/**
* Compiles source filters to a predicate.
*/
def toPredicate(filters: Seq[sources.Filter]): BasePredicate = {
val reducedExpr = filters
.sortBy(_.references.length)
.flatMap(filterToExpression(_, toRef))
.reduce(And)
Predicate.create(reducedExpr)
}

// Finds a filter attribute in the schema and converts it to a `BoundReference`
private def toRef(attr: String): Option[BoundReference] = {
// The names have been normalized and case sensitivity is not a concern here.
schema.getFieldIndex(attr).map { index =>
val field = schema(index)
BoundReference(index, field.dataType, field.nullable)
}
}
}

object StructFilters {
private def checkFilterRefs(filter: sources.Filter, fieldNames: Set[String]): Boolean = {
// The names have been normalized and case sensitivity is not a concern here.
filter.references.forall(fieldNames.contains)
}

/**
* Returns the filters currently supported by the datasource.
* @param filters The filters pushed down to the datasource.
* @param schema data schema of datasource files.
* @return a sub-set of `filters` that can be handled by the datasource.
*/
def pushedFilters(filters: Array[sources.Filter], schema: StructType): Array[sources.Filter] = {
val fieldNames = schema.fieldNames.toSet
filters.filter(checkFilterRefs(_, fieldNames))
}

private def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = {
a.zip(b).headOption
}

private def toLiteral(value: Any): Option[Literal] = {
Try(Literal(value)).toOption
}

/**
* Converts a filter to an expression and binds it to row positions.
*
* @param filter The filter to convert.
* @param toRef The function converts a filter attribute to a bound reference.
* @return some expression with resolved attributes or `None` if the conversion
* of the given filter to an expression is impossible.
*/
def filterToExpression(
filter: sources.Filter,
toRef: String => Option[BoundReference]): Option[Expression] = {
def zipAttributeAndValue(name: String, value: Any): Option[(BoundReference, Literal)] = {
zip(toRef(name), toLiteral(value))
}
def translate(filter: sources.Filter): Option[Expression] = filter match {
case sources.And(left, right) =>
zip(translate(left), translate(right)).map(And.tupled)
case sources.Or(left, right) =>
zip(translate(left), translate(right)).map(Or.tupled)
case sources.Not(child) =>
translate(child).map(Not)
case sources.EqualTo(attribute, value) =>
zipAttributeAndValue(attribute, value).map(EqualTo.tupled)
case sources.EqualNullSafe(attribute, value) =>
zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled)
case sources.IsNull(attribute) =>
toRef(attribute).map(IsNull)
case sources.IsNotNull(attribute) =>
toRef(attribute).map(IsNotNull)
case sources.In(attribute, values) =>
val literals = values.toSeq.flatMap(toLiteral)
if (literals.length == values.length) {
toRef(attribute).map(In(_, literals))
} else {
None
}
case sources.GreaterThan(attribute, value) =>
zipAttributeAndValue(attribute, value).map(GreaterThan.tupled)
case sources.GreaterThanOrEqual(attribute, value) =>
zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled)
case sources.LessThan(attribute, value) =>
zipAttributeAndValue(attribute, value).map(LessThan.tupled)
case sources.LessThanOrEqual(attribute, value) =>
zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled)
case sources.StringContains(attribute, value) =>
zipAttributeAndValue(attribute, value).map(Contains.tupled)
case sources.StringStartsWith(attribute, value) =>
zipAttributeAndValue(attribute, value).map(StartsWith.tupled)
case sources.StringEndsWith(attribute, value) =>
zipAttributeAndValue(attribute, value).map(EndsWith.tupled)
case sources.AlwaysTrue() =>
Some(Literal(true, BooleanType))
case sources.AlwaysFalse() =>
Some(Literal(false, BooleanType))
}
translate(filter)
}
}

class NoopFilters extends StructFilters(Seq.empty, new StructType()) {
override def skipRow(row: InternalRow, index: Int): Boolean = false
override def reset(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.sql.catalyst.csv

import scala.util.Try

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{InternalRow, StructFilters}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.{BooleanType, StructType}
import org.apache.spark.sql.types.StructType

/**
* An instance of the class compiles filters to predicates and allows to
Expand All @@ -33,7 +31,8 @@ import org.apache.spark.sql.types.{BooleanType, StructType}
* @param filters The filters pushed down to CSV datasource.
* @param requiredSchema The schema with only fields requested by the upper layer.
*/
class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) {
class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType)
extends StructFilters(filters, requiredSchema) {
/**
* Converted filters to predicates and grouped by maximum field index
* in the read schema. For example, if an filter refers to 2 attributes
Expand All @@ -54,138 +53,49 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) {
for (filter <- filters) {
val refs = filter.references
val index = if (refs.isEmpty) {
// For example, AlwaysTrue and AlwaysFalse doesn't have any references
// For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references
// Filters w/o refs always return the same result. Taking into account
// that predicates are combined via And, we can apply such filters only
// that predicates are combined via `And`, we can apply such filters only
// once at the position 0.
0
} else {
// readSchema must contain attributes of all filters.
// Accordingly, fieldIndex() returns a valid index always.
// Accordingly, `fieldIndex()` returns a valid index always.
refs.map(requiredSchema.fieldIndex).max
}
groupedFilters(index) :+= filter
}
if (len > 0 && !groupedFilters(0).isEmpty) {
// We assume that filters w/o refs like AlwaysTrue and AlwaysFalse
// We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse`
// can be evaluated faster that others. We put them in front of others.
val (literals, others) = groupedFilters(0).partition(_.references.isEmpty)
groupedFilters(0) = literals ++ others
}
for (i <- 0 until len) {
if (!groupedFilters(i).isEmpty) {
val reducedExpr = groupedFilters(i)
.flatMap(CSVFilters.filterToExpression(_, toRef))
.reduce(And)
groupedPredicates(i) = Predicate.create(reducedExpr)
groupedPredicates(i) = toPredicate(groupedFilters(i))
}
}
}
groupedPredicates
}

/**
* Applies all filters that refer to row fields at the positions from 0 to index.
* Applies all filters that refer to row fields at the positions from 0 to `index`.
* @param row The internal row to check.
* @param index Maximum field index. The function assumes that all fields
* from 0 to index position are set.
* @return false iff row fields at the position from 0 to index pass filters
* from 0 to `index` position are set.
* @return false` iff row fields at the position from 0 to `index` pass filters
* or there are no applicable filters
* otherwise false if at least one of the filters returns false.
* otherwise `false` if at least one of the filters returns `false`.
*/
def skipRow(row: InternalRow, index: Int): Boolean = {
val predicate = predicates(index)
predicate != null && !predicate.eval(row)
}

// Finds a filter attribute in the read schema and converts it to a `BoundReference`
private def toRef(attr: String): Option[BoundReference] = {
requiredSchema.getFieldIndex(attr).map { index =>
val field = requiredSchema(index)
BoundReference(requiredSchema.fieldIndex(attr), field.dataType, field.nullable)
}
}
}

object CSVFilters {
private def checkFilterRefs(filter: sources.Filter, schema: StructType): Boolean = {
val fieldNames = schema.fields.map(_.name).toSet
filter.references.forall(fieldNames.contains(_))
}

/**
* Returns the filters currently supported by CSV datasource.
* @param filters The filters pushed down to CSV datasource.
* @param schema data schema of CSV files.
* @return a sub-set of `filters` that can be handled by CSV datasource.
*/
def pushedFilters(filters: Array[sources.Filter], schema: StructType): Array[sources.Filter] = {
filters.filter(checkFilterRefs(_, schema))
}

private def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = {
a.zip(b).headOption
}

private def toLiteral(value: Any): Option[Literal] = {
Try(Literal(value)).toOption
}

/**
* Converts a filter to an expression and binds it to row positions.
*
* @param filter The filter to convert.
* @param toRef The function converts a filter attribute to a bound reference.
* @return some expression with resolved attributes or None if the conversion
* of the given filter to an expression is impossible.
*/
def filterToExpression(
filter: sources.Filter,
toRef: String => Option[BoundReference]): Option[Expression] = {
def zipAttributeAndValue(name: String, value: Any): Option[(BoundReference, Literal)] = {
zip(toRef(name), toLiteral(value))
}
def translate(filter: sources.Filter): Option[Expression] = filter match {
case sources.And(left, right) =>
zip(translate(left), translate(right)).map(And.tupled)
case sources.Or(left, right) =>
zip(translate(left), translate(right)).map(Or.tupled)
case sources.Not(child) =>
translate(child).map(Not)
case sources.EqualTo(attribute, value) =>
zipAttributeAndValue(attribute, value).map(EqualTo.tupled)
case sources.EqualNullSafe(attribute, value) =>
zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled)
case sources.IsNull(attribute) =>
toRef(attribute).map(IsNull)
case sources.IsNotNull(attribute) =>
toRef(attribute).map(IsNotNull)
case sources.In(attribute, values) =>
val literals = values.toSeq.flatMap(toLiteral)
if (literals.length == values.length) {
toRef(attribute).map(In(_, literals))
} else {
None
}
case sources.GreaterThan(attribute, value) =>
zipAttributeAndValue(attribute, value).map(GreaterThan.tupled)
case sources.GreaterThanOrEqual(attribute, value) =>
zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled)
case sources.LessThan(attribute, value) =>
zipAttributeAndValue(attribute, value).map(LessThan.tupled)
case sources.LessThanOrEqual(attribute, value) =>
zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled)
case sources.StringContains(attribute, value) =>
zipAttributeAndValue(attribute, value).map(Contains.tupled)
case sources.StringStartsWith(attribute, value) =>
zipAttributeAndValue(attribute, value).map(StartsWith.tupled)
case sources.StringEndsWith(attribute, value) =>
zipAttributeAndValue(attribute, value).map(EndsWith.tupled)
case sources.AlwaysTrue() =>
Some(Literal(true, BooleanType))
case sources.AlwaysFalse() =>
Some(Literal(false, BooleanType))
}
translate(filter)
}
// CSV filters are applied sequentially, and no need to track which filter references
// point out to already set row values. The `reset()` method is trivial because
// the filters don't have any states.
def reset(): Unit = {}
}
Loading