diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/StructFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/StructFilters.scala new file mode 100644 index 0000000000000..fed1b323f5773 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/StructFilters.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import scala.util.Try + +import org.apache.spark.sql.catalyst.StructFilters._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.{BooleanType, StructType} + +/** + * The class provides API for applying pushed down filters to partially or + * fully set internal rows that have the struct schema. + * + * `StructFilters` assumes that: + * - `reset()` is called before any `skipRow()` calls for new row. + * + * @param pushedFilters The pushed down source filters. The filters should refer to + * the fields of the provided schema. + * @param schema The required schema of records from datasource files. + */ +abstract class StructFilters(pushedFilters: Seq[sources.Filter], schema: StructType) { + + protected val filters = StructFilters.pushedFilters(pushedFilters.toArray, schema) + + /** + * Applies pushed down source filters to the given row assuming that + * value at `index` has been already set. + * + * @param row The row with fully or partially set values. + * @param index The index of already set value. + * @return `true` if currently processed row can be skipped otherwise false. + */ + def skipRow(row: InternalRow, index: Int): Boolean + + /** + * Resets states of pushed down filters. The method must be called before + * precessing any new row otherwise `skipRow()` may return wrong result. + */ + def reset(): Unit + + /** + * Compiles source filters to a predicate. + */ + def toPredicate(filters: Seq[sources.Filter]): BasePredicate = { + val reducedExpr = filters + .sortBy(_.references.length) + .flatMap(filterToExpression(_, toRef)) + .reduce(And) + Predicate.create(reducedExpr) + } + + // Finds a filter attribute in the schema and converts it to a `BoundReference` + private def toRef(attr: String): Option[BoundReference] = { + // The names have been normalized and case sensitivity is not a concern here. + schema.getFieldIndex(attr).map { index => + val field = schema(index) + BoundReference(index, field.dataType, field.nullable) + } + } +} + +object StructFilters { + private def checkFilterRefs(filter: sources.Filter, fieldNames: Set[String]): Boolean = { + // The names have been normalized and case sensitivity is not a concern here. + filter.references.forall(fieldNames.contains) + } + + /** + * Returns the filters currently supported by the datasource. + * @param filters The filters pushed down to the datasource. + * @param schema data schema of datasource files. + * @return a sub-set of `filters` that can be handled by the datasource. + */ + def pushedFilters(filters: Array[sources.Filter], schema: StructType): Array[sources.Filter] = { + val fieldNames = schema.fieldNames.toSet + filters.filter(checkFilterRefs(_, fieldNames)) + } + + private def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = { + a.zip(b).headOption + } + + private def toLiteral(value: Any): Option[Literal] = { + Try(Literal(value)).toOption + } + + /** + * Converts a filter to an expression and binds it to row positions. + * + * @param filter The filter to convert. + * @param toRef The function converts a filter attribute to a bound reference. + * @return some expression with resolved attributes or `None` if the conversion + * of the given filter to an expression is impossible. + */ + def filterToExpression( + filter: sources.Filter, + toRef: String => Option[BoundReference]): Option[Expression] = { + def zipAttributeAndValue(name: String, value: Any): Option[(BoundReference, Literal)] = { + zip(toRef(name), toLiteral(value)) + } + def translate(filter: sources.Filter): Option[Expression] = filter match { + case sources.And(left, right) => + zip(translate(left), translate(right)).map(And.tupled) + case sources.Or(left, right) => + zip(translate(left), translate(right)).map(Or.tupled) + case sources.Not(child) => + translate(child).map(Not) + case sources.EqualTo(attribute, value) => + zipAttributeAndValue(attribute, value).map(EqualTo.tupled) + case sources.EqualNullSafe(attribute, value) => + zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled) + case sources.IsNull(attribute) => + toRef(attribute).map(IsNull) + case sources.IsNotNull(attribute) => + toRef(attribute).map(IsNotNull) + case sources.In(attribute, values) => + val literals = values.toSeq.flatMap(toLiteral) + if (literals.length == values.length) { + toRef(attribute).map(In(_, literals)) + } else { + None + } + case sources.GreaterThan(attribute, value) => + zipAttributeAndValue(attribute, value).map(GreaterThan.tupled) + case sources.GreaterThanOrEqual(attribute, value) => + zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled) + case sources.LessThan(attribute, value) => + zipAttributeAndValue(attribute, value).map(LessThan.tupled) + case sources.LessThanOrEqual(attribute, value) => + zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled) + case sources.StringContains(attribute, value) => + zipAttributeAndValue(attribute, value).map(Contains.tupled) + case sources.StringStartsWith(attribute, value) => + zipAttributeAndValue(attribute, value).map(StartsWith.tupled) + case sources.StringEndsWith(attribute, value) => + zipAttributeAndValue(attribute, value).map(EndsWith.tupled) + case sources.AlwaysTrue() => + Some(Literal(true, BooleanType)) + case sources.AlwaysFalse() => + Some(Literal(false, BooleanType)) + } + translate(filter) + } +} + +class NoopFilters extends StructFilters(Seq.empty, new StructType()) { + override def skipRow(row: InternalRow, index: Int): Boolean = false + override def reset(): Unit = {} +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala index b50a76a496556..d2cb2c4d8134a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVFilters.scala @@ -17,13 +17,11 @@ package org.apache.spark.sql.catalyst.csv -import scala.util.Try - -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, StructFilters} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources -import org.apache.spark.sql.types.{BooleanType, StructType} +import org.apache.spark.sql.types.StructType /** * An instance of the class compiles filters to predicates and allows to @@ -33,7 +31,8 @@ import org.apache.spark.sql.types.{BooleanType, StructType} * @param filters The filters pushed down to CSV datasource. * @param requiredSchema The schema with only fields requested by the upper layer. */ -class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) { +class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) + extends StructFilters(filters, requiredSchema) { /** * Converted filters to predicates and grouped by maximum field index * in the read schema. For example, if an filter refers to 2 attributes @@ -54,30 +53,27 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) { for (filter <- filters) { val refs = filter.references val index = if (refs.isEmpty) { - // For example, AlwaysTrue and AlwaysFalse doesn't have any references + // For example, `AlwaysTrue` and `AlwaysFalse` doesn't have any references // Filters w/o refs always return the same result. Taking into account - // that predicates are combined via And, we can apply such filters only + // that predicates are combined via `And`, we can apply such filters only // once at the position 0. 0 } else { // readSchema must contain attributes of all filters. - // Accordingly, fieldIndex() returns a valid index always. + // Accordingly, `fieldIndex()` returns a valid index always. refs.map(requiredSchema.fieldIndex).max } groupedFilters(index) :+= filter } if (len > 0 && !groupedFilters(0).isEmpty) { - // We assume that filters w/o refs like AlwaysTrue and AlwaysFalse + // We assume that filters w/o refs like `AlwaysTrue` and `AlwaysFalse` // can be evaluated faster that others. We put them in front of others. val (literals, others) = groupedFilters(0).partition(_.references.isEmpty) groupedFilters(0) = literals ++ others } for (i <- 0 until len) { if (!groupedFilters(i).isEmpty) { - val reducedExpr = groupedFilters(i) - .flatMap(CSVFilters.filterToExpression(_, toRef)) - .reduce(And) - groupedPredicates(i) = Predicate.create(reducedExpr) + groupedPredicates(i) = toPredicate(groupedFilters(i)) } } } @@ -85,107 +81,21 @@ class CSVFilters(filters: Seq[sources.Filter], requiredSchema: StructType) { } /** - * Applies all filters that refer to row fields at the positions from 0 to index. + * Applies all filters that refer to row fields at the positions from 0 to `index`. * @param row The internal row to check. * @param index Maximum field index. The function assumes that all fields - * from 0 to index position are set. - * @return false iff row fields at the position from 0 to index pass filters + * from 0 to `index` position are set. + * @return false` iff row fields at the position from 0 to `index` pass filters * or there are no applicable filters - * otherwise false if at least one of the filters returns false. + * otherwise `false` if at least one of the filters returns `false`. */ def skipRow(row: InternalRow, index: Int): Boolean = { val predicate = predicates(index) predicate != null && !predicate.eval(row) } - // Finds a filter attribute in the read schema and converts it to a `BoundReference` - private def toRef(attr: String): Option[BoundReference] = { - requiredSchema.getFieldIndex(attr).map { index => - val field = requiredSchema(index) - BoundReference(requiredSchema.fieldIndex(attr), field.dataType, field.nullable) - } - } -} - -object CSVFilters { - private def checkFilterRefs(filter: sources.Filter, schema: StructType): Boolean = { - val fieldNames = schema.fields.map(_.name).toSet - filter.references.forall(fieldNames.contains(_)) - } - - /** - * Returns the filters currently supported by CSV datasource. - * @param filters The filters pushed down to CSV datasource. - * @param schema data schema of CSV files. - * @return a sub-set of `filters` that can be handled by CSV datasource. - */ - def pushedFilters(filters: Array[sources.Filter], schema: StructType): Array[sources.Filter] = { - filters.filter(checkFilterRefs(_, schema)) - } - - private def zip[A, B](a: Option[A], b: Option[B]): Option[(A, B)] = { - a.zip(b).headOption - } - - private def toLiteral(value: Any): Option[Literal] = { - Try(Literal(value)).toOption - } - - /** - * Converts a filter to an expression and binds it to row positions. - * - * @param filter The filter to convert. - * @param toRef The function converts a filter attribute to a bound reference. - * @return some expression with resolved attributes or None if the conversion - * of the given filter to an expression is impossible. - */ - def filterToExpression( - filter: sources.Filter, - toRef: String => Option[BoundReference]): Option[Expression] = { - def zipAttributeAndValue(name: String, value: Any): Option[(BoundReference, Literal)] = { - zip(toRef(name), toLiteral(value)) - } - def translate(filter: sources.Filter): Option[Expression] = filter match { - case sources.And(left, right) => - zip(translate(left), translate(right)).map(And.tupled) - case sources.Or(left, right) => - zip(translate(left), translate(right)).map(Or.tupled) - case sources.Not(child) => - translate(child).map(Not) - case sources.EqualTo(attribute, value) => - zipAttributeAndValue(attribute, value).map(EqualTo.tupled) - case sources.EqualNullSafe(attribute, value) => - zipAttributeAndValue(attribute, value).map(EqualNullSafe.tupled) - case sources.IsNull(attribute) => - toRef(attribute).map(IsNull) - case sources.IsNotNull(attribute) => - toRef(attribute).map(IsNotNull) - case sources.In(attribute, values) => - val literals = values.toSeq.flatMap(toLiteral) - if (literals.length == values.length) { - toRef(attribute).map(In(_, literals)) - } else { - None - } - case sources.GreaterThan(attribute, value) => - zipAttributeAndValue(attribute, value).map(GreaterThan.tupled) - case sources.GreaterThanOrEqual(attribute, value) => - zipAttributeAndValue(attribute, value).map(GreaterThanOrEqual.tupled) - case sources.LessThan(attribute, value) => - zipAttributeAndValue(attribute, value).map(LessThan.tupled) - case sources.LessThanOrEqual(attribute, value) => - zipAttributeAndValue(attribute, value).map(LessThanOrEqual.tupled) - case sources.StringContains(attribute, value) => - zipAttributeAndValue(attribute, value).map(Contains.tupled) - case sources.StringStartsWith(attribute, value) => - zipAttributeAndValue(attribute, value).map(StartsWith.tupled) - case sources.StringEndsWith(attribute, value) => - zipAttributeAndValue(attribute, value).map(EndsWith.tupled) - case sources.AlwaysTrue() => - Some(Literal(true, BooleanType)) - case sources.AlwaysFalse() => - Some(Literal(false, BooleanType)) - } - translate(filter) - } + // CSV filters are applied sequentially, and no need to track which filter references + // point out to already set row values. The `reset()` method is trivial because + // the filters don't have any states. + def reset(): Unit = {} } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7f69180e87e7e..9ddbacbbc3e6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -27,11 +27,12 @@ import com.fasterxml.jackson.core._ import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} import org.apache.spark.util.Utils @@ -42,7 +43,8 @@ import org.apache.spark.util.Utils class JacksonParser( schema: DataType, val options: JSONOptions, - allowArrayAsStructs: Boolean) extends Logging { + allowArrayAsStructs: Boolean, + filters: Seq[Filter] = Seq.empty) extends Logging { import JacksonUtils._ import com.fasterxml.jackson.core.JsonToken._ @@ -85,8 +87,9 @@ class JacksonParser( private def makeStructRootConverter(st: StructType): JsonParser => Iterable[InternalRow] = { val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter).toArray + val jsonFilters = new JsonFilters(filters, st) (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) { - case START_OBJECT => Some(convertObject(parser, st, fieldConverters)) + case START_OBJECT => convertObject(parser, st, fieldConverters, jsonFilters) // SPARK-3308: support reading top level JSON arrays and take every element // in such an array as a row // @@ -146,7 +149,7 @@ class JacksonParser( // val st = at.elementType.asInstanceOf[StructType] val fieldConverters = st.map(_.dataType).map(makeConverter).toArray - Some(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters))))) + Some(InternalRow(new GenericArrayData(convertObject(parser, st, fieldConverters).toArray))) } } @@ -291,7 +294,7 @@ class JacksonParser( case st: StructType => val fieldConverters = st.map(_.dataType).map(makeConverter).toArray (parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) { - case START_OBJECT => convertObject(parser, st, fieldConverters) + case START_OBJECT => convertObject(parser, st, fieldConverters).get } case at: ArrayType => @@ -375,15 +378,19 @@ class JacksonParser( private def convertObject( parser: JsonParser, schema: StructType, - fieldConverters: Array[ValueConverter]): InternalRow = { + fieldConverters: Array[ValueConverter], + structFilters: StructFilters = new NoopFilters()): Option[InternalRow] = { val row = new GenericInternalRow(schema.length) var badRecordException: Option[Throwable] = None + var skipRow = false - while (nextUntil(parser, JsonToken.END_OBJECT)) { + structFilters.reset() + while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => try { row.update(index, fieldConverters(index).apply(parser)) + skipRow = structFilters.skipRow(row, index) } catch { case e: SparkUpgradeException => throw e case NonFatal(e) => @@ -395,8 +402,10 @@ class JacksonParser( } } - if (badRecordException.isEmpty) { - row + if (skipRow) { + None + } else if (badRecordException.isEmpty) { + Some(row) } else { throw PartialResultException(row, badRecordException.get) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala new file mode 100644 index 0000000000000..09022bfc15403 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import org.apache.spark.sql.catalyst.{InternalRow, StructFilters} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.StructType + +/** + * The class provides API for applying pushed down source filters to rows with + * a struct schema parsed from JSON records. The class should be used in this way: + * 1. Before processing of the next row, `JacksonParser` (parser for short) resets the internal + * state of `JsonFilters` by calling the `reset()` method. + * 2. The parser reads JSON fields one-by-one in streaming fashion. It converts an incoming + * field value to the desired type from the schema. After that, it sets the value to an instance + * of `InternalRow` at the position according to the schema. Order of parsed JSON fields can + * be different from the order in the schema. + * 3. Per every JSON field of the top-level JSON object, the parser calls `skipRow` by passing + * an `InternalRow` in which some of fields can be already set, and the position of the JSON + * field according to the schema. + * 3.1 `skipRow` finds a group of predicates that refers to this JSON field. + * 3.2 Per each predicate from the group, `skipRow` decrements its reference counter. + * 3.2.1 If predicate reference counter becomes 0, it means that all predicate attributes have + * been already set in the internal row, and the predicate can be applied to it. `skipRow` + * invokes the predicate for the row. + * 3.3 `skipRow` applies predicates until one of them returns `false`. In that case, the method + * returns `true` to the parser. + * 3.4 If all predicates with zero reference counter return `true`, the final result of + * the method is `false` which tells the parser to not skip the row. + * 4. If the parser gets `true` from `JsonFilters.skipRow`, it must not call the method anymore + * for this internal row, and should go the step 1. + * + * Besides of `StructFilters` assumptions, `JsonFilters` assumes that: + * - `skipRow()` can be called for any valid index of the struct fields, + * and in any order. + * - After `skipRow()` returns `true`, the internal state of `JsonFilters` can be inconsistent, + * so, `skipRow()` must not be called for the current row anymore without `reset()`. + * + * @param pushedFilters The pushed down source filters. The filters should refer to + * the fields of the provided schema. + * @param schema The required schema of records from datasource files. + */ +class JsonFilters(pushedFilters: Seq[sources.Filter], schema: StructType) + extends StructFilters(pushedFilters, schema) { + + /** + * Stateful JSON predicate that keeps track of its dependent references in the + * current row via `refCount`. + * + * @param predicate The predicate compiled from pushed down source filters. + * @param totalRefs The total amount of all filters references which the predicate + * compiled from. + */ + case class JsonPredicate(predicate: BasePredicate, totalRefs: Int) { + // The current number of predicate references in the row that have been not set yet. + // When `refCount` reaches zero, the predicate has all dependencies are set, and can + // be applied to the row. + var refCount: Int = totalRefs + + def reset(): Unit = { + refCount = totalRefs + } + } + + // Predicates compiled from the pushed down filters. The predicates are grouped by their + // attributes. The i-th group contains predicates that refer to the i-th field of the given + // schema. A predicates can be placed to many groups if it has many attributes. For example: + // schema: i INTEGER, s STRING + // filters: IsNotNull("i"), AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc")) + // predicates: + // 0: Array(IsNotNull("i"), AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) + // 1: Array(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) + private val predicates: Array[Array[JsonPredicate]] = { + val groupedPredicates = Array.fill(schema.length)(Array.empty[JsonPredicate]) + if (SQLConf.get.jsonFilterPushDown) { + val groupedByRefSet: Map[Set[String], JsonPredicate] = filters + // Group filters that have the same set of references. For example: + // IsNotNull("i") -> Set("i"), AlwaysTrue -> Set(), + // Or(EqualTo("i", 0), StringStartsWith("s", "abc")) -> Set("i", "s") + // By grouping filters we could avoid tracking their state of references in the + // current row separately. + .groupBy(_.references.toSet) + // Combine all filters from the same group by `And` because all filters should + // return `true` to do not skip a row. The result is compiled to a predicate. + .map { case (refSet, refsFilters) => + (refSet, JsonPredicate(toPredicate(refsFilters), refSet.size)) + } + // Apply predicates w/o references like `AlwaysTrue` and `AlwaysFalse` to all fields. + // We cannot set such predicates to a particular position because skipRow() can + // be invoked for any index due to unpredictable order of JSON fields in JSON records. + val withLiterals: Map[Set[String], JsonPredicate] = groupedByRefSet.map { + case (refSet, pred) if refSet.isEmpty => + (schema.fields.map(_.name).toSet, pred.copy(totalRefs = 1)) + case others => others + } + // Build a map where key is only one field and value is seq of predicates refer to the field + // "i" -> Seq(AlwaysTrue, IsNotNull("i"), Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) + // "s" -> Seq(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) + val groupedByFields: Map[String, Seq[(String, JsonPredicate)]] = withLiterals.toSeq + .flatMap { case (refSet, pred) => refSet.map((_, pred)) } + .groupBy(_._1) + // Build the final array by converting keys of `groupedByFields` to their + // indexes in the provided schema. + groupedByFields.foreach { case (fieldName, fieldPredicates) => + val fieldIndex = schema.fieldIndex(fieldName) + groupedPredicates(fieldIndex) = fieldPredicates.map(_._2).toArray + } + } + groupedPredicates + } + + /** + * Applies predicates (compiled filters) associated with the row field value + * at the position `index` only if other predicates dependencies are already + * set in the given row. + * + * Note: If the function returns `true`, `refCount` of some predicates can be not decremented. + * + * @param row The row with fully or partially set values. + * @param index The index of already set value. + * @return `true` if at least one of applicable predicates (all dependent row values are set) + * return `false`. It returns `false` if all predicates return `true`. + */ + def skipRow(row: InternalRow, index: Int): Boolean = { + var skip = false + for (pred <- predicates(index) if !skip) { + pred.refCount -= 1 + assert(pred.refCount >= 0, + s"Predicate reference counter cannot be negative but got ${pred.refCount}.") + skip = pred.refCount == 0 && !pred.predicate.eval(row) + } + skip + } + + /** + * Reset states of all predicates by re-initializing reference counters. + */ + override def reset(): Unit = predicates.foreach(_.foreach(_.reset)) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9be0497e46603..d1122dacf4de7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2543,6 +2543,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.json.filterPushdown.enabled") + .doc("When true, enable filter pushdown to JSON datasource.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val ADD_PARTITION_BATCH_SIZE = buildConf("spark.sql.addPartitionInBatch.size") .internal() @@ -3255,6 +3261,8 @@ class SQLConf extends Serializable with Logging { def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED) + def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED) + def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) def legacyAllowCastNumericToTimestamp: Boolean = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/StructFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/StructFiltersSuite.scala new file mode 100644 index 0000000000000..3893b2b07c519 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/StructFiltersSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.sources +import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, Filter} +import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.unsafe.types.UTF8String + +abstract class StructFiltersSuite extends SparkFunSuite { + + protected def createFilters(filters: Seq[sources.Filter], schema: StructType): StructFilters + + test("filter to expression conversion") { + val ref = BoundReference(0, IntegerType, true) + def check(f: Filter, expr: Expression): Unit = { + assert(StructFilters.filterToExpression(f, _ => Some(ref)).get === expr) + } + + check(sources.AlwaysTrue, Literal(true)) + check(sources.AlwaysFalse, Literal(false)) + check(sources.IsNull("a"), IsNull(ref)) + check(sources.Not(sources.IsNull("a")), Not(IsNull(ref))) + check(sources.IsNotNull("a"), IsNotNull(ref)) + check(sources.EqualTo("a", "b"), EqualTo(ref, Literal("b"))) + check(sources.EqualNullSafe("a", "b"), EqualNullSafe(ref, Literal("b"))) + check(sources.StringStartsWith("a", "b"), StartsWith(ref, Literal("b"))) + check(sources.StringEndsWith("a", "b"), EndsWith(ref, Literal("b"))) + check(sources.StringContains("a", "b"), Contains(ref, Literal("b"))) + check(sources.LessThanOrEqual("a", 1), LessThanOrEqual(ref, Literal(1))) + check(sources.LessThan("a", 1), LessThan(ref, Literal(1))) + check(sources.GreaterThanOrEqual("a", 1), GreaterThanOrEqual(ref, Literal(1))) + check(sources.GreaterThan("a", 1), GreaterThan(ref, Literal(1))) + check(sources.And(sources.AlwaysTrue, sources.AlwaysTrue), And(Literal(true), Literal(true))) + check(sources.Or(sources.AlwaysTrue, sources.AlwaysTrue), Or(Literal(true), Literal(true))) + check(sources.In("a", Array(1)), In(ref, Seq(Literal(1)))) + } + + private def getSchema(str: String): StructType = str match { + case "" => new StructType() + case _ => StructType.fromDDL(str) + } + + test("skipping rows") { + def check( + requiredSchema: String = "i INTEGER, d DOUBLE", + filters: Seq[Filter], + row: InternalRow, + pos: Int, + skip: Boolean): Unit = { + val structFilters = createFilters(filters, getSchema(requiredSchema)) + structFilters.reset() + assert(structFilters.skipRow(row, pos) === skip) + } + + check(filters = Seq(), row = InternalRow(3.14), pos = 0, skip = false) + check(filters = Seq(AlwaysTrue), row = InternalRow(1), pos = 0, skip = false) + check(filters = Seq(AlwaysFalse), row = InternalRow(1), pos = 0, skip = true) + check( + filters = Seq(sources.EqualTo("i", 1), sources.LessThan("d", 10), sources.AlwaysFalse), + row = InternalRow(1, 3.14), + pos = 0, + skip = true) + check( + filters = Seq(sources.EqualTo("i", 10)), + row = InternalRow(10, 3.14), + pos = 0, + skip = false) + check( + filters = Seq(sources.IsNotNull("d"), sources.GreaterThanOrEqual("d", 2.96)), + row = InternalRow(3.14), + pos = 0, + skip = false) + check( + filters = Seq(sources.In("i", Array(10, 20)), sources.LessThanOrEqual("d", 2.96)), + row = InternalRow(10, 3.14), + pos = 1, + skip = true) + val filters1 = Seq( + sources.Or( + sources.AlwaysTrue, + sources.And( + sources.Not(sources.IsNull("i")), + sources.Not( + sources.And( + sources.StringEndsWith("s", "ab"), + sources.StringEndsWith("s", "cd") + ) + ) + ) + ), + sources.GreaterThan("d", 0), + sources.LessThan("i", 500) + ) + val filters2 = Seq( + sources.And( + sources.StringContains("s", "abc"), + sources.And( + sources.Not(sources.IsNull("i")), + sources.And( + sources.StringEndsWith("s", "ab"), + sources.StringEndsWith("s", "bc") + ) + ) + ), + sources.GreaterThan("d", 100), + sources.LessThan("i", 0) + ) + Seq(filters1 -> false, filters2 -> true).foreach { case (filters, skip) => + val schema = "i INTEGER, d DOUBLE, s STRING" + val row = InternalRow(10, 3.14, UTF8String.fromString("abc")) + val structFilters = createFilters(filters, getSchema(schema)) + structFilters.reset() + for { p <- 0 until 3 if !skip } { + assert(structFilters.skipRow(row, p) === skip, s"p = $p filters = $filters skip = $skip") + } + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala index 499bbaf452aee..21bef20d7d4d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVFiltersSuite.scala @@ -17,118 +17,12 @@ package org.apache.spark.sql.catalyst.csv -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.{StructFilters, StructFiltersSuite} import org.apache.spark.sql.sources -import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, Filter} -import org.apache.spark.sql.types.{IntegerType, StructType} -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.sql.types.StructType -class CSVFiltersSuite extends SparkFunSuite { - test("filter to expression conversion") { - val ref = BoundReference(0, IntegerType, true) - def check(f: Filter, expr: Expression): Unit = { - assert(CSVFilters.filterToExpression(f, _ => Some(ref)).get === expr) - } - - check(sources.AlwaysTrue, Literal(true)) - check(sources.AlwaysFalse, Literal(false)) - check(sources.IsNull("a"), IsNull(ref)) - check(sources.Not(sources.IsNull("a")), Not(IsNull(ref))) - check(sources.IsNotNull("a"), IsNotNull(ref)) - check(sources.EqualTo("a", "b"), EqualTo(ref, Literal("b"))) - check(sources.EqualNullSafe("a", "b"), EqualNullSafe(ref, Literal("b"))) - check(sources.StringStartsWith("a", "b"), StartsWith(ref, Literal("b"))) - check(sources.StringEndsWith("a", "b"), EndsWith(ref, Literal("b"))) - check(sources.StringContains("a", "b"), Contains(ref, Literal("b"))) - check(sources.LessThanOrEqual("a", 1), LessThanOrEqual(ref, Literal(1))) - check(sources.LessThan("a", 1), LessThan(ref, Literal(1))) - check(sources.GreaterThanOrEqual("a", 1), GreaterThanOrEqual(ref, Literal(1))) - check(sources.GreaterThan("a", 1), GreaterThan(ref, Literal(1))) - check(sources.And(sources.AlwaysTrue, sources.AlwaysTrue), And(Literal(true), Literal(true))) - check(sources.Or(sources.AlwaysTrue, sources.AlwaysTrue), Or(Literal(true), Literal(true))) - check(sources.In("a", Array(1)), In(ref, Seq(Literal(1)))) - } - - private def getSchema(str: String): StructType = str match { - case "" => new StructType() - case _ => StructType.fromDDL(str) - } - - test("skipping rows") { - def check( - requiredSchema: String = "i INTEGER, d DOUBLE", - filters: Seq[Filter], - row: InternalRow, - pos: Int, - skip: Boolean): Unit = { - val csvFilters = new CSVFilters(filters, getSchema(requiredSchema)) - assert(csvFilters.skipRow(row, pos) === skip) - } - - check(filters = Seq(), row = InternalRow(3.14), pos = 0, skip = false) - check(filters = Seq(AlwaysTrue), row = InternalRow(1), pos = 0, skip = false) - check(filters = Seq(AlwaysFalse), row = InternalRow(1), pos = 0, skip = true) - check( - filters = Seq(sources.EqualTo("i", 1), sources.LessThan("d", 10), sources.AlwaysFalse), - row = InternalRow(1, 3.14), - pos = 0, - skip = true) - check( - filters = Seq(sources.EqualTo("i", 10)), - row = InternalRow(10, 3.14), - pos = 0, - skip = false) - check( - filters = Seq(sources.IsNotNull("d"), sources.GreaterThanOrEqual("d", 2.96)), - row = InternalRow(3.14), - pos = 0, - skip = false) - check( - filters = Seq(sources.In("i", Array(10, 20)), sources.LessThanOrEqual("d", 2.96)), - row = InternalRow(10, 3.14), - pos = 1, - skip = true) - val filters1 = Seq( - sources.Or( - sources.AlwaysTrue, - sources.And( - sources.Not(sources.IsNull("i")), - sources.Not( - sources.And( - sources.StringEndsWith("s", "ab"), - sources.StringEndsWith("s", "cd") - ) - ) - ) - ), - sources.GreaterThan("d", 0), - sources.LessThan("i", 500) - ) - val filters2 = Seq( - sources.And( - sources.StringContains("s", "abc"), - sources.And( - sources.Not(sources.IsNull("i")), - sources.And( - sources.StringEndsWith("s", "ab"), - sources.StringEndsWith("s", "bc") - ) - ) - ), - sources.GreaterThan("d", 100), - sources.LessThan("i", 0) - ) - Seq(filters1 -> false, filters2 -> true).foreach { case (filters, skip) => - for (p <- 0 until 3) { - check( - requiredSchema = "i INTEGER, d DOUBLE, s STRING", - filters = filters, - row = InternalRow(10, 3.14, UTF8String.fromString("abc")), - pos = p, - skip = skip) - } - } +class CSVFiltersSuite extends StructFiltersSuite { + override def createFilters(filters: Seq[sources.Filter], schema: StructType): StructFilters = { + new CSVFilters(filters, schema) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala new file mode 100644 index 0000000000000..587e22e787b87 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class JacksonParserSuite extends SparkFunSuite { + test("skipping rows using pushdown filters") { + def check( + input: String = """{"i":1, "s": "a"}""", + schema: StructType = StructType.fromDDL("i INTEGER"), + filters: Seq[Filter], + expected: Seq[InternalRow]): Unit = { + val options = new JSONOptions(Map.empty[String, String], "GMT", "") + val parser = new JacksonParser(schema, options, false, filters) + val createParser = CreateJacksonParser.string _ + val actual = parser.parse(input, createParser, UTF8String.fromString) + assert(actual === expected) + } + + check(filters = Seq(), expected = Seq(InternalRow(1))) + check(filters = Seq(EqualTo("i", 1)), expected = Seq(InternalRow(1))) + check(filters = Seq(EqualTo("i", 2)), expected = Seq.empty) + check( + schema = StructType.fromDDL("s STRING"), + filters = Seq(StringStartsWith("s", "b")), + expected = Seq.empty) + check( + schema = StructType.fromDDL("i INTEGER, s STRING"), + filters = Seq(StringStartsWith("s", "a")), + expected = Seq(InternalRow(1, UTF8String.fromString("a")))) + check( + input = """{"i":1,"s": "a", "d": 3.14}""", + schema = StructType.fromDDL("i INTEGER, d DOUBLE"), + filters = Seq(EqualTo("d", 3.14)), + expected = Seq(InternalRow(1, 3.14))) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonFiltersSuite.scala new file mode 100644 index 0000000000000..82f5e71d67964 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonFiltersSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import org.apache.spark.sql.catalyst.{StructFilters, StructFiltersSuite} +import org.apache.spark.sql.sources +import org.apache.spark.sql.types.StructType + +class JsonFiltersSuite extends StructFiltersSuite { + override def createFilters(filters: Seq[sources.Filter], schema: StructType): StructFilters = { + new JsonFilters(filters, schema) + } +} diff --git a/sql/core/benchmarks/CSVBenchmark-jdk11-results.txt b/sql/core/benchmarks/CSVBenchmark-jdk11-results.txt index 0e82b632793d2..03c51ddad1c62 100644 --- a/sql/core/benchmarks/CSVBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/CSVBenchmark-jdk11-results.txt @@ -6,62 +6,62 @@ OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-106 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -One quoted string 46568 46683 198 0.0 931358.6 1.0X +One quoted string 53332 53484 194 0.0 1066633.5 1.0X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 1000 columns 129836 130796 1404 0.0 129836.0 1.0X -Select 100 columns 40444 40679 261 0.0 40443.5 3.2X -Select one column 33429 33475 73 0.0 33428.6 3.9X -count() 7967 8047 73 0.1 7966.7 16.3X -Select 100 columns, one bad input field 90639 90832 266 0.0 90638.6 1.4X -Select 100 columns, corrupt record field 109023 109084 74 0.0 109023.3 1.2X +Select 1000 columns 127472 128337 1185 0.0 127472.4 1.0X +Select 100 columns 43731 43856 130 0.0 43730.7 2.9X +Select one column 37347 37401 47 0.0 37347.4 3.4X +count() 8014 8028 25 0.1 8013.8 15.9X +Select 100 columns, one bad input field 95603 95726 106 0.0 95603.0 1.3X +Select 100 columns, corrupt record field 111851 111969 171 0.0 111851.4 1.1X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns + count() 20685 20707 35 0.5 2068.5 1.0X -Select 1 column + count() 13096 13149 49 0.8 1309.6 1.6X -count() 3994 4001 7 2.5 399.4 5.2X +Select 10 columns + count() 20364 20481 110 0.5 2036.4 1.0X +Select 1 column + count() 14706 14803 153 0.7 1470.6 1.4X +count() 3855 3880 32 2.6 385.5 5.3X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 2169 2203 32 4.6 216.9 1.0X -to_csv(timestamp) 14401 14591 168 0.7 1440.1 0.2X -write timestamps to files 13209 13276 59 0.8 1320.9 0.2X -Create a dataset of dates 2231 2248 17 4.5 223.1 1.0X -to_csv(date) 10406 10473 68 1.0 1040.6 0.2X -write dates to files 7970 7976 9 1.3 797.0 0.3X +Create a dataset of timestamps 2191 2205 14 4.6 219.1 1.0X +to_csv(timestamp) 13209 13253 43 0.8 1320.9 0.2X +write timestamps to files 12300 12380 71 0.8 1230.0 0.2X +Create a dataset of dates 2254 2269 14 4.4 225.4 1.0X +to_csv(date) 7980 8006 22 1.3 798.0 0.3X +write dates to files 7076 7100 26 1.4 707.6 0.3X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -read timestamp text from files 2387 2391 6 4.2 238.7 1.0X -read timestamps from files 53503 53593 124 0.2 5350.3 0.0X -infer timestamps from files 107988 108668 647 0.1 10798.8 0.0X -read date text from files 2121 2133 12 4.7 212.1 1.1X -read date from files 29983 30039 48 0.3 2998.3 0.1X -infer date from files 30196 30436 218 0.3 3019.6 0.1X -timestamp strings 3098 3109 10 3.2 309.8 0.8X -parse timestamps from Dataset[String] 63331 63426 84 0.2 6333.1 0.0X -infer timestamps from Dataset[String] 124003 124463 490 0.1 12400.3 0.0X -date strings 3423 3429 11 2.9 342.3 0.7X -parse dates from Dataset[String] 34235 34314 76 0.3 3423.5 0.1X -from_csv(timestamp) 60829 61600 668 0.2 6082.9 0.0X -from_csv(date) 33047 33173 139 0.3 3304.7 0.1X +read timestamp text from files 2405 2408 5 4.2 240.5 1.0X +read timestamps from files 54502 54624 207 0.2 5450.2 0.0X +infer timestamps from files 112896 113040 135 0.1 11289.6 0.0X +read date text from files 2127 2141 23 4.7 212.7 1.1X +read date from files 30229 30257 29 0.3 3022.9 0.1X +infer date from files 28156 28621 409 0.4 2815.6 0.1X +timestamp strings 3096 3097 1 3.2 309.6 0.8X +parse timestamps from Dataset[String] 63096 63751 571 0.2 6309.6 0.0X +infer timestamps from Dataset[String] 120916 121262 556 0.1 12091.6 0.0X +date strings 3445 3457 13 2.9 344.5 0.7X +parse dates from Dataset[String] 37481 37585 91 0.3 3748.1 0.1X +from_csv(timestamp) 57933 57996 69 0.2 5793.3 0.0X +from_csv(date) 35312 35469 164 0.3 3531.2 0.1X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 28752 28765 16 0.0 287516.5 1.0X -pushdown disabled 28856 28880 22 0.0 288556.3 1.0X -w/ filters 1714 1731 15 0.1 17137.3 16.8X +w/o filters 24751 24829 67 0.0 247510.6 1.0X +pushdown disabled 24856 24889 29 0.0 248558.7 1.0X +w/ filters 1881 1892 11 0.1 18814.4 13.2X diff --git a/sql/core/benchmarks/CSVBenchmark-results.txt b/sql/core/benchmarks/CSVBenchmark-results.txt index a3af46c037bf9..a0d8c0c6fd492 100644 --- a/sql/core/benchmarks/CSVBenchmark-results.txt +++ b/sql/core/benchmarks/CSVBenchmark-results.txt @@ -6,62 +6,62 @@ OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aw Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Parsing quoted values: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -One quoted string 45457 45731 344 0.0 909136.8 1.0X +One quoted string 47588 47831 244 0.0 951755.4 1.0X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Wide rows with 1000 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 1000 columns 129646 130527 1412 0.0 129646.3 1.0X -Select 100 columns 42444 42551 119 0.0 42444.0 3.1X -Select one column 35415 35428 20 0.0 35414.6 3.7X -count() 11114 11128 16 0.1 11113.6 11.7X -Select 100 columns, one bad input field 93353 93670 275 0.0 93352.6 1.4X -Select 100 columns, corrupt record field 113569 113952 373 0.0 113568.8 1.1X +Select 1000 columns 129509 130323 1388 0.0 129509.4 1.0X +Select 100 columns 42474 42572 108 0.0 42473.6 3.0X +Select one column 35479 35586 93 0.0 35479.1 3.7X +count() 11021 11071 47 0.1 11021.3 11.8X +Select 100 columns, one bad input field 94652 94795 134 0.0 94652.0 1.4X +Select 100 columns, corrupt record field 115336 115542 350 0.0 115336.0 1.1X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Count a dataset with 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns + count() 18498 18589 87 0.5 1849.8 1.0X -Select 1 column + count() 11078 11095 27 0.9 1107.8 1.7X -count() 3928 3950 22 2.5 392.8 4.7X +Select 10 columns + count() 19959 20022 76 0.5 1995.9 1.0X +Select 1 column + count() 13920 13968 54 0.7 1392.0 1.4X +count() 3928 3938 11 2.5 392.8 5.1X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 1933 1940 11 5.2 193.3 1.0X -to_csv(timestamp) 18078 18243 255 0.6 1807.8 0.1X -write timestamps to files 12668 12786 134 0.8 1266.8 0.2X -Create a dataset of dates 2196 2201 5 4.6 219.6 0.9X -to_csv(date) 9583 9597 21 1.0 958.3 0.2X -write dates to files 7091 7110 20 1.4 709.1 0.3X +Create a dataset of timestamps 1940 1977 56 5.2 194.0 1.0X +to_csv(timestamp) 15398 15669 458 0.6 1539.8 0.1X +write timestamps to files 12438 12454 19 0.8 1243.8 0.2X +Create a dataset of dates 2157 2171 18 4.6 215.7 0.9X +to_csv(date) 11764 11839 95 0.9 1176.4 0.2X +write dates to files 8893 8907 12 1.1 889.3 0.2X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -read timestamp text from files 2166 2177 10 4.6 216.6 1.0X -read timestamps from files 53212 53402 281 0.2 5321.2 0.0X -infer timestamps from files 109788 110372 570 0.1 10978.8 0.0X -read date text from files 1921 1929 8 5.2 192.1 1.1X -read date from files 25470 25499 25 0.4 2547.0 0.1X -infer date from files 27201 27342 134 0.4 2720.1 0.1X -timestamp strings 3638 3653 19 2.7 363.8 0.6X -parse timestamps from Dataset[String] 61894 62532 555 0.2 6189.4 0.0X -infer timestamps from Dataset[String] 125171 125430 236 0.1 12517.1 0.0X -date strings 3736 3749 14 2.7 373.6 0.6X -parse dates from Dataset[String] 30787 30829 43 0.3 3078.7 0.1X -from_csv(timestamp) 60842 61035 209 0.2 6084.2 0.0X -from_csv(date) 30123 30196 95 0.3 3012.3 0.1X +read timestamp text from files 2219 2230 11 4.5 221.9 1.0X +read timestamps from files 51519 51725 192 0.2 5151.9 0.0X +infer timestamps from files 104744 104885 124 0.1 10474.4 0.0X +read date text from files 1940 1943 4 5.2 194.0 1.1X +read date from files 27099 27118 33 0.4 2709.9 0.1X +infer date from files 27662 27703 61 0.4 2766.2 0.1X +timestamp strings 4225 4242 15 2.4 422.5 0.5X +parse timestamps from Dataset[String] 56090 56479 376 0.2 5609.0 0.0X +infer timestamps from Dataset[String] 115629 116245 1049 0.1 11562.9 0.0X +date strings 4337 4344 10 2.3 433.7 0.5X +parse dates from Dataset[String] 32373 32476 120 0.3 3237.3 0.1X +from_csv(timestamp) 54952 55157 300 0.2 5495.2 0.0X +from_csv(date) 30924 30985 66 0.3 3092.4 0.1X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -w/o filters 28985 29042 80 0.0 289852.9 1.0X -pushdown disabled 29080 29146 58 0.0 290799.4 1.0X -w/ filters 2072 2084 17 0.0 20722.3 14.0X +w/o filters 25630 25636 8 0.0 256301.4 1.0X +pushdown disabled 25673 25681 9 0.0 256734.0 1.0X +w/ filters 1873 1886 15 0.1 18733.1 13.7X diff --git a/sql/core/benchmarks/JsonBenchmark-jdk11-results.txt b/sql/core/benchmarks/JsonBenchmark-jdk11-results.txt index 2d506f03d9f7e..6f68d60ce619a 100644 --- a/sql/core/benchmarks/JsonBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/JsonBenchmark-jdk11-results.txt @@ -7,106 +7,114 @@ OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-106 Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz JSON schema inferring: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 73307 73400 141 1.4 733.1 1.0X -UTF-8 is set 143834 143925 152 0.7 1438.3 0.5X +No encoding 70753 71127 471 1.4 707.5 1.0X +UTF-8 is set 128105 129183 1165 0.8 1281.1 0.6X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz count a short column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 50894 51065 292 2.0 508.9 1.0X -UTF-8 is set 98462 99455 1173 1.0 984.6 0.5X +No encoding 59588 59643 73 1.7 595.9 1.0X +UTF-8 is set 97081 97122 62 1.0 970.8 0.6X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz count a wide column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 64011 64969 1001 0.2 6401.1 1.0X -UTF-8 is set 102757 102984 311 0.1 10275.7 0.6X +No encoding 58835 59259 659 0.2 5883.5 1.0X +UTF-8 is set 103117 103218 88 0.1 10311.7 0.6X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz select wide row: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 132559 133561 1010 0.0 265117.3 1.0X -UTF-8 is set 151458 152129 611 0.0 302915.4 0.9X +No encoding 142993 143485 436 0.0 285985.3 1.0X +UTF-8 is set 165446 165496 60 0.0 330892.4 0.9X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Select a subset of 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns 21148 21202 87 0.5 2114.8 1.0X -Select 1 column 24701 24724 21 0.4 2470.1 0.9X +Select 10 columns 21557 21593 61 0.5 2155.7 1.0X +Select 1 column 24197 24236 35 0.4 2419.7 0.9X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz creation of JSON parser per line: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Short column without encoding 6945 6998 59 1.4 694.5 1.0X -Short column with UTF-8 11510 11569 51 0.9 1151.0 0.6X -Wide column without encoding 95004 95795 790 0.1 9500.4 0.1X -Wide column with UTF-8 149223 149409 276 0.1 14922.3 0.0X +Short column without encoding 9795 9820 29 1.0 979.5 1.0X +Short column with UTF-8 16442 16536 146 0.6 1644.2 0.6X +Wide column without encoding 99134 99475 300 0.1 9913.4 0.1X +Wide column with UTF-8 155913 156369 692 0.1 15591.3 0.1X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz JSON functions: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 649 652 3 15.4 64.9 1.0X -from_json 22284 22393 99 0.4 2228.4 0.0X -json_tuple 32310 32824 484 0.3 3231.0 0.0X -get_json_object 22111 22751 568 0.5 2211.1 0.0X +Text read 671 679 7 14.9 67.1 1.0X +from_json 25356 25432 79 0.4 2535.6 0.0X +json_tuple 29464 29927 672 0.3 2946.4 0.0X +get_json_object 21841 21877 32 0.5 2184.1 0.0X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Dataset of json strings: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 2894 2903 8 17.3 57.9 1.0X -schema inferring 26724 26785 62 1.9 534.5 0.1X -parsing 37502 37632 131 1.3 750.0 0.1X +Text read 3109 3116 12 16.1 62.2 1.0X +schema inferring 28751 28765 15 1.7 575.0 0.1X +parsing 34923 35030 151 1.4 698.5 0.1X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Json files in the per-line mode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 10994 11010 16 4.5 219.9 1.0X -Schema inferring 45654 45677 37 1.1 913.1 0.2X -Parsing without charset 34476 34559 73 1.5 689.5 0.3X -Parsing with UTF-8 56987 57002 13 0.9 1139.7 0.2X +Text read 10787 10818 32 4.6 215.7 1.0X +Schema inferring 49577 49775 184 1.0 991.5 0.2X +Parsing without charset 35343 35433 87 1.4 706.9 0.3X +Parsing with UTF-8 60253 60290 35 0.8 1205.1 0.2X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 2150 2188 35 4.7 215.0 1.0X -to_json(timestamp) 17874 18080 294 0.6 1787.4 0.1X -write timestamps to files 12518 12538 34 0.8 1251.8 0.2X -Create a dataset of dates 2298 2310 18 4.4 229.8 0.9X -to_json(date) 11673 11703 27 0.9 1167.3 0.2X -write dates to files 7121 7135 12 1.4 712.1 0.3X +Create a dataset of timestamps 2200 2209 8 4.5 220.0 1.0X +to_json(timestamp) 18410 18602 264 0.5 1841.0 0.1X +write timestamps to files 11841 12032 305 0.8 1184.1 0.2X +Create a dataset of dates 2353 2363 9 4.3 235.3 0.9X +to_json(date) 12135 12182 72 0.8 1213.5 0.2X +write dates to files 6776 6801 33 1.5 677.6 0.3X OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -read timestamp text from files 2616 2641 34 3.8 261.6 1.0X -read timestamps from files 37481 37517 58 0.3 3748.1 0.1X -infer timestamps from files 84774 84964 201 0.1 8477.4 0.0X -read date text from files 2362 2365 3 4.2 236.2 1.1X -read date from files 16583 16612 29 0.6 1658.3 0.2X -timestamp strings 3927 3963 40 2.5 392.7 0.7X -parse timestamps from Dataset[String] 52827 53004 243 0.2 5282.7 0.0X -infer timestamps from Dataset[String] 101108 101644 769 0.1 10110.8 0.0X -date strings 4886 4906 26 2.0 488.6 0.5X -parse dates from Dataset[String] 27623 27694 62 0.4 2762.3 0.1X -from_json(timestamp) 71764 71887 124 0.1 7176.4 0.0X -from_json(date) 46200 46314 99 0.2 4620.0 0.1X +read timestamp text from files 2563 2580 20 3.9 256.3 1.0X +read timestamps from files 41261 41360 97 0.2 4126.1 0.1X +infer timestamps from files 92292 92517 243 0.1 9229.2 0.0X +read date text from files 2332 2340 11 4.3 233.2 1.1X +read date from files 18753 18768 13 0.5 1875.3 0.1X +timestamp strings 3108 3123 13 3.2 310.8 0.8X +parse timestamps from Dataset[String] 51078 51448 323 0.2 5107.8 0.1X +infer timestamps from Dataset[String] 101373 101429 65 0.1 10137.3 0.0X +date strings 4126 4138 15 2.4 412.6 0.6X +parse dates from Dataset[String] 29365 29398 30 0.3 2936.5 0.1X +from_json(timestamp) 67033 67098 63 0.1 6703.3 0.0X +from_json(date) 44495 44581 125 0.2 4449.5 0.1X + +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +w/o filters 30167 30223 48 0.0 301674.9 1.0X +pushdown disabled 30291 30311 30 0.0 302914.8 1.0X +w/ filters 901 915 14 0.1 9012.4 33.5X diff --git a/sql/core/benchmarks/JsonBenchmark-results.txt b/sql/core/benchmarks/JsonBenchmark-results.txt index c22118f91b3fc..38ad9d0077f9a 100644 --- a/sql/core/benchmarks/JsonBenchmark-results.txt +++ b/sql/core/benchmarks/JsonBenchmark-results.txt @@ -7,106 +7,114 @@ OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aw Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz JSON schema inferring: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 63839 64000 263 1.6 638.4 1.0X -UTF-8 is set 124633 124945 429 0.8 1246.3 0.5X +No encoding 78058 78116 76 1.3 780.6 1.0X +UTF-8 is set 125709 126521 1367 0.8 1257.1 0.6X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz count a short column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 51720 51901 157 1.9 517.2 1.0X -UTF-8 is set 91161 91190 25 1.1 911.6 0.6X +No encoding 60424 60567 188 1.7 604.2 1.0X +UTF-8 is set 92714 92864 140 1.1 927.1 0.7X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz count a wide column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 58486 59038 714 0.2 5848.6 1.0X -UTF-8 is set 103045 103350 358 0.1 10304.5 0.6X +No encoding 65047 65761 662 0.2 6504.7 1.0X +UTF-8 is set 101823 101918 113 0.1 10182.3 0.6X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz select wide row: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -No encoding 134909 135024 105 0.0 269818.6 1.0X -UTF-8 is set 154418 154593 155 0.0 308836.7 0.9X +No encoding 145471 146067 601 0.0 290941.4 1.0X +UTF-8 is set 158504 159237 635 0.0 317008.4 0.9X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Select a subset of 10 columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Select 10 columns 19538 19620 70 0.5 1953.8 1.0X -Select 1 column 26142 26159 15 0.4 2614.2 0.7X +Select 10 columns 21386 21451 112 0.5 2138.6 1.0X +Select 1 column 27172 27214 58 0.4 2717.2 0.8X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz creation of JSON parser per line: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Short column without encoding 8103 8162 53 1.2 810.3 1.0X -Short column with UTF-8 13104 13150 58 0.8 1310.4 0.6X -Wide column without encoding 135280 135593 375 0.1 13528.0 0.1X -Wide column with UTF-8 175189 175483 278 0.1 17518.9 0.0X +Short column without encoding 9283 9363 69 1.1 928.3 1.0X +Short column with UTF-8 15330 15369 61 0.7 1533.0 0.6X +Wide column without encoding 138885 139153 239 0.1 13888.5 0.1X +Wide column with UTF-8 177201 177650 501 0.1 17720.1 0.1X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz JSON functions: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 1225 1234 8 8.2 122.5 1.0X -from_json 22482 22552 95 0.4 2248.2 0.1X -json_tuple 30203 30338 146 0.3 3020.3 0.0X -get_json_object 22219 22245 26 0.5 2221.9 0.1X +Text read 1224 1243 17 8.2 122.4 1.0X +from_json 25191 25327 214 0.4 2519.1 0.0X +json_tuple 30333 30380 42 0.3 3033.3 0.0X +get_json_object 21611 21739 112 0.5 2161.1 0.1X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Dataset of json strings: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 5897 5904 10 8.5 117.9 1.0X -schema inferring 30282 30340 50 1.7 605.6 0.2X -parsing 33304 33577 289 1.5 666.1 0.2X +Text read 5923 5941 32 8.4 118.5 1.0X +schema inferring 34089 34238 135 1.5 681.8 0.2X +parsing 44699 45952 1108 1.1 894.0 0.1X Preparing data for benchmarking ... OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Json files in the per-line mode: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Text read 9710 9757 80 5.1 194.2 1.0X -Schema inferring 35929 35939 9 1.4 718.6 0.3X -Parsing without charset 39175 39227 87 1.3 783.5 0.2X -Parsing with UTF-8 59188 59294 109 0.8 1183.8 0.2X +Text read 9727 9776 50 5.1 194.5 1.0X +Schema inferring 52529 52643 98 1.0 1050.6 0.2X +Parsing without charset 44563 44692 132 1.1 891.3 0.2X +Parsing with UTF-8 55558 55755 218 0.9 1111.2 0.2X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Write dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Create a dataset of timestamps 1967 1977 9 5.1 196.7 1.0X -to_json(timestamp) 17086 17304 371 0.6 1708.6 0.1X -write timestamps to files 12691 12716 28 0.8 1269.1 0.2X -Create a dataset of dates 2192 2217 39 4.6 219.2 0.9X -to_json(date) 10541 10656 137 0.9 1054.1 0.2X -write dates to files 7259 7311 46 1.4 725.9 0.3X +Create a dataset of timestamps 1945 1964 22 5.1 194.5 1.0X +to_json(timestamp) 17990 18135 249 0.6 1799.0 0.1X +write timestamps to files 13198 13234 45 0.8 1319.8 0.1X +Create a dataset of dates 2202 2213 11 4.5 220.2 0.9X +to_json(date) 11219 11240 29 0.9 1121.9 0.2X +write dates to files 6932 6966 32 1.4 693.2 0.3X OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Read dates and timestamps: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -read timestamp text from files 2318 2326 13 4.3 231.8 1.0X -read timestamps from files 43345 43627 258 0.2 4334.5 0.1X -infer timestamps from files 89570 89621 59 0.1 8957.0 0.0X -read date text from files 2099 2107 9 4.8 209.9 1.1X -read date from files 18000 18065 98 0.6 1800.0 0.1X -timestamp strings 3937 3956 32 2.5 393.7 0.6X -parse timestamps from Dataset[String] 56001 56429 539 0.2 5600.1 0.0X -infer timestamps from Dataset[String] 109410 109963 559 0.1 10941.0 0.0X -date strings 4530 4540 9 2.2 453.0 0.5X -parse dates from Dataset[String] 29723 29767 72 0.3 2972.3 0.1X -from_json(timestamp) 74106 74619 728 0.1 7410.6 0.0X -from_json(date) 46599 46632 32 0.2 4659.9 0.0X +read timestamp text from files 2354 2368 12 4.2 235.4 1.0X +read timestamps from files 43681 43771 100 0.2 4368.1 0.1X +infer timestamps from files 90608 90771 161 0.1 9060.8 0.0X +read date text from files 2121 2129 9 4.7 212.1 1.1X +read date from files 19069 19103 32 0.5 1906.9 0.1X +timestamp strings 3943 3967 24 2.5 394.3 0.6X +parse timestamps from Dataset[String] 55239 55324 74 0.2 5523.9 0.0X +infer timestamps from Dataset[String] 106155 106258 99 0.1 10615.5 0.0X +date strings 4567 4572 5 2.2 456.7 0.5X +parse dates from Dataset[String] 31258 31461 321 0.3 3125.8 0.1X +from_json(timestamp) 76499 77031 504 0.1 7649.9 0.0X +from_json(date) 44188 44199 9 0.2 4418.8 0.1X + +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +Filters pushdown: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +w/o filters 30314 30334 28 0.0 303139.1 1.0X +pushdown disabled 30394 30429 54 0.0 303944.7 1.0X +w/ filters 906 913 8 0.1 9059.1 33.5X diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 95a63c3d1e2d9..e0fa4584185e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -125,7 +125,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { } (file: PartitionedFile) => { - val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) + val parser = new JacksonParser( + actualSchema, + parsedOptions, + allowArrayAsStructs = true, + filters) JsonDataSource(parsedOptions).readFile( broadcastedHadoopConf.value.value, file, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala index 81a234e254000..f7a79bf31948e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScanBuilder.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.csv import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.csv.CSVFilters +import org.apache.spark.sql.catalyst.StructFilters import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder @@ -49,7 +49,7 @@ case class CSVScanBuilder( override def pushFilters(filters: Array[Filter]): Array[Filter] = { if (sparkSession.sessionState.conf.csvFilterPushDown) { - _pushedFilters = CSVFilters.pushedFilters(filters, dataSchema) + _pushedFilters = StructFilters.pushedFilters(filters, dataSchema) } filters } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala index 698423948f916..9737803b597a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonPartitionReaderFactory.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.json.JsonDataSource import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration @@ -36,6 +37,7 @@ import org.apache.spark.util.SerializableConfiguration * @param readDataSchema Required schema of JSON files. * @param partitionSchema Schema of partitions. * @param parsedOptions Options for parsing JSON files. + * @param filters The filters pushed down to JSON datasource. */ case class JsonPartitionReaderFactory( sqlConf: SQLConf, @@ -43,12 +45,17 @@ case class JsonPartitionReaderFactory( dataSchema: StructType, readDataSchema: StructType, partitionSchema: StructType, - parsedOptions: JSONOptionsInRead) extends FilePartitionReaderFactory { + parsedOptions: JSONOptionsInRead, + filters: Seq[Filter]) extends FilePartitionReaderFactory { override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { val actualSchema = StructType(readDataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)) - val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true) + val parser = new JacksonParser( + actualSchema, + parsedOptions, + allowArrayAsStructs = true, + filters) val iter = JsonDataSource(parsedOptions).readFile( broadcastedConf.value.value, partitionedFile, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 75231625676ff..7ad106627a083 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.json.JsonDataSource import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan} +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -39,6 +40,7 @@ case class JsonScan( readDataSchema: StructType, readPartitionSchema: StructType, options: CaseInsensitiveStringMap, + pushedFilters: Array[Filter], partitionFilters: Seq[Expression] = Seq.empty, dataFilters: Seq[Expression] = Seq.empty) extends TextBasedFileScan(sparkSession, options) { @@ -86,7 +88,7 @@ case class JsonScan( // The partition values are already truncated in `FileScan.partitions`. // We should use `readPartitionSchema` as the partition schema here. JsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, parsedOptions) + dataSchema, readDataSchema, readPartitionSchema, parsedOptions, pushedFilters) } override def withFilters( @@ -94,10 +96,14 @@ case class JsonScan( this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) override def equals(obj: Any): Boolean = obj match { - case j: JsonScan => super.equals(j) && dataSchema == j.dataSchema && options == j.options - + case j: JsonScan => super.equals(j) && dataSchema == j.dataSchema && options == j.options && + equivalentFilters(pushedFilters, j.pushedFilters) case _ => false } override def hashCode(): Int = super.hashCode() + + override def description(): String = { + super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala index be53b1b1676f1..cf1204566ddbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScanBuilder.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.datasources.v2.json import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.read.Scan +import org.apache.spark.sql.catalyst.StructFilters +import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -29,8 +31,26 @@ class JsonScanBuilder ( schema: StructType, dataSchema: StructType, options: CaseInsensitiveStringMap) - extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + extends FileScanBuilder(sparkSession, fileIndex, dataSchema) with SupportsPushDownFilters { override def build(): Scan = { - JsonScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options) + JsonScan( + sparkSession, + fileIndex, + dataSchema, + readDataSchema(), + readPartitionSchema(), + options, + pushedFilters()) } + + private var _pushedFilters: Array[Filter] = Array.empty + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + if (sparkSession.sessionState.conf.jsonFilterPushDown) { + _pushedFilters = StructFilters.pushedFilters(filters, dataSchema) + } + filters + } + + override def pushedFilters(): Array[Filter] = _pushedFilters } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index fcb7bdc25f08f..db6f45247d130 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2353,6 +2353,36 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(df.schema.last == StructField("col_mixed_types", StringType, true)) } } + + test("case sensitivity of filters references") { + Seq(true, false).foreach { filterPushdown => + withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) { + withTempPath { path => + Seq( + """aaa,BBB""", + """0,1""", + """2,3""").toDF().repartition(1).write.text(path.getCanonicalPath) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val readback = spark.read.schema("aaa integer, BBB integer") + .option("header", true) + .csv(path.getCanonicalPath) + checkAnswer(readback, Seq(Row(2, 3), Row(0, 1))) + checkAnswer(readback.filter($"AAA" === 2 && $"bbb" === 3), Seq(Row(2, 3))) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val readback = spark.read.schema("aaa integer, BBB integer") + .option("header", true) + .csv(path.getCanonicalPath) + checkAnswer(readback, Seq(Row(2, 3), Row(0, 1))) + val errorMsg = intercept[AnalysisException] { + readback.filter($"AAA" === 2 && $"bbb" === 3).collect() + }.getMessage + assert(errorMsg.contains("cannot resolve '`AAA`'")) + } + } + } + } + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala index 0dbd6b5754afb..9ff35c0946cc9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmark.scala @@ -20,9 +20,10 @@ import java.io.File import java.time.{Instant, LocalDate} import org.apache.spark.benchmark.Benchmark -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.{Column, Dataset, Row} import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -495,6 +496,45 @@ object JsonBenchmark extends SqlBasedBenchmark { } } + private def filtersPushdownBenchmark(rowsNum: Int, numIters: Int): Unit = { + val benchmark = new Benchmark("Filters pushdown", rowsNum, output = output) + val colsNum = 100 + val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", TimestampType)) + val schema = StructType(StructField("key", IntegerType) +: fields) + def columns(): Seq[Column] = { + val ts = Seq.tabulate(colsNum) { i => + lit(Instant.ofEpochSecond(i * 12345678)).as(s"col$i") + } + ($"id" % 1000).as("key") +: ts + } + withTempPath { path => + spark.range(rowsNum).select(columns(): _*).write.json(path.getAbsolutePath) + def readback = { + spark.read.schema(schema).json(path.getAbsolutePath) + } + + benchmark.addCase("w/o filters", numIters) { _ => + readback.noop() + } + + def withFilter(configEnabled: Boolean): Unit = { + withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> configEnabled.toString()) { + readback.filter($"key" === 0).noop() + } + } + + benchmark.addCase("pushdown disabled", numIters) { _ => + withFilter(configEnabled = false) + } + + benchmark.addCase("w/ filters", numIters) { _ => + withFilter(configEnabled = true) + } + + benchmark.run() + } + } + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { val numIters = 3 runBenchmark("Benchmark for performance of JSON parsing") { @@ -508,6 +548,9 @@ object JsonBenchmark extends SqlBasedBenchmark { jsonInDS(50 * 1000 * 1000, numIters) jsonInFile(50 * 1000 * 1000, numIters) datetimeBenchmark(rowsNum = 10 * 1000 * 1000, numIters) + // Benchmark pushdown filters that refer to top-level columns. + // TODO (SPARK-32325): Add benchmarks for filters with nested column attributes. + filtersPushdownBenchmark(rowsNum = 100 * 1000, numIters) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index de01099f2db55..2a98cba663a1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -35,12 +35,14 @@ import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.ExternalRDD -import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, NoopCache} +import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.types.StructType.fromDDL import org.apache.spark.sql.types.TestUDT.{MyDenseVector, MyDenseVectorUDT} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils class TestFileFilter extends PathFilter { @@ -2688,6 +2690,122 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson checkAnswer(json, Row(null)) } } + + test("filters push down") { + withTempPath { path => + val t = "2019-12-17 00:01:02" + Seq( + """{"c0": "abc", "c1": {"c2": 1, "c3": "2019-11-14 20:35:30"}}""", + s"""{"c0": "def", "c1": {"c2": 2, "c3": "$t"}}""", + s"""{"c0": "defa", "c1": {"c2": 3, "c3": "$t"}}""", + s"""{"c0": "define", "c1": {"c2": 2, "c3": "$t"}}""").toDF("data") + .repartition(1) + .write.text(path.getAbsolutePath) + Seq(true, false).foreach { filterPushdown => + withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) { + Seq("PERMISSIVE", "DROPMALFORMED", "FAILFAST").foreach { mode => + val readback = spark.read + .option("mode", mode) + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .schema("c0 string, c1 struct") + .json(path.getAbsolutePath) + .where($"c1.c2" === 2 && $"c0".startsWith("def")) + .select($"c1.c3") + assert(readback.count() === 2) + checkAnswer(readback, Seq(Row(Timestamp.valueOf(t)), Row(Timestamp.valueOf(t)))) + } + } + } + } + } + + test("apply filters to malformed rows") { + withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { path => + Seq( + "{}", + """{"invalid": 0}""", + """{"i":}""", + """{"i": 0}""", + """{"i": 1, "t": "2020-01-28 01:00:00"}""", + """{"t": "2020-01-28 02:00:00"}""", + """{"i": "abc", "t": "2020-01-28 03:00:00"}""", + """{"i": 2, "t": "2020-01-28 04:00:00", "d": 3.14}""").toDF("data") + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = "i INTEGER, t TIMESTAMP" + val readback = spark.read + .schema(schema) + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .json(path.getAbsolutePath) + // readback: + // +----+-------------------+ + // |i |t | + // +----+-------------------+ + // |null|null | + // |null|null | + // |null|null | + // |0 |null | + // |1 |2020-01-28 01:00:00| + // |null|2020-01-28 02:00:00| + // |null|2020-01-28 03:00:00| + // |2 |2020-01-28 04:00:00| + // +----+-------------------+ + checkAnswer( + readback.where($"i".isNull && $"t".isNotNull), + Seq( + Row(null, Timestamp.valueOf("2020-01-28 02:00:00")), + Row(null, Timestamp.valueOf("2020-01-28 03:00:00")))) + checkAnswer( + readback.where($"i" >= 0 && $"t" > "2020-01-28 00:00:00"), + Seq( + Row(1, Timestamp.valueOf("2020-01-28 01:00:00")), + Row(2, Timestamp.valueOf("2020-01-28 04:00:00")))) + checkAnswer( + readback.where($"t".isNull).select($"i"), + Seq(Row(null), Row(null), Row(null), Row(0))) + } + } + } + + test("case sensitivity of filters references") { + Seq(true, false).foreach { filterPushdown => + withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) { + withTempPath { path => + Seq( + """{"aaa": 0, "BBB": 1}""", + """{"AAA": 2, "bbb": 3}""").toDF().write.text(path.getCanonicalPath) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + val readback = spark.read.schema("aaa integer, BBB integer") + .json(path.getCanonicalPath) + checkAnswer(readback, Seq(Row(null, null), Row(0, 1))) + checkAnswer(readback.filter($"AAA" === 0 && $"bbb" === 1), Seq(Row(0, 1))) + checkAnswer(readback.filter($"AAA" === 2 && $"bbb" === 3), Seq()) + // Schema inferring + val errorMsg = intercept[AnalysisException] { + spark.read.json(path.getCanonicalPath).collect() + }.getMessage + assert(errorMsg.contains("Found duplicate column(s) in the data schema")) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val readback = spark.read.schema("aaa integer, BBB integer") + .json(path.getCanonicalPath) + checkAnswer(readback, Seq(Row(null, null), Row(0, 1))) + val errorMsg = intercept[AnalysisException] { + readback.filter($"AAA" === 0 && $"bbb" === 1).collect() + }.getMessage + assert(errorMsg.contains("cannot resolve '`AAA`'")) + // Schema inferring + val readback2 = spark.read.json(path.getCanonicalPath) + checkAnswer( + readback2.filter($"AAA" === 2).select($"AAA", $"bbb"), + Seq(Row(2, 3))) + checkAnswer(readback2.filter($"aaa" === 2).select($"AAA", $"bbb"), Seq()) + } + } + } + } + } } class JsonV1Suite extends JsonSuite { @@ -2702,6 +2820,37 @@ class JsonV2Suite extends JsonSuite { super .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "") + + test("get pushed filters") { + val attr = "col" + def getBuilder(path: String): JsonScanBuilder = { + val fileIndex = new InMemoryFileIndex( + spark, + Seq(new org.apache.hadoop.fs.Path(path, "file.json")), + Map.empty, + None, + NoopCache) + val schema = new StructType().add(attr, IntegerType) + val options = CaseInsensitiveStringMap.empty() + new JsonScanBuilder(spark, fileIndex, schema, schema, options) + } + val filters: Array[sources.Filter] = Array(sources.IsNotNull(attr)) + withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { file => + val scanBuilder = getBuilder(file.getCanonicalPath) + assert(scanBuilder.pushFilters(filters) === filters) + assert(scanBuilder.pushedFilters() === filters) + } + } + + withSQLConf(SQLConf.JSON_FILTER_PUSHDOWN_ENABLED.key -> "false") { + withTempPath { file => + val scanBuilder = getBuilder(file.getCanonicalPath) + assert(scanBuilder.pushFilters(filters) === filters) + assert(scanBuilder.pushedFilters() === Array.empty[sources.Filter]) + } + } + } } class JsonLegacyTimeParserSuite extends JsonSuite {