diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 9ddbacbbc3e6e..da76f8eeaf350 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -87,7 +87,11 @@ class JacksonParser( private def makeStructRootConverter(st: StructType): JsonParser => Iterable[InternalRow] = { val elementConverter = makeConverter(st) val fieldConverters = st.map(_.dataType).map(makeConverter).toArray - val jsonFilters = new JsonFilters(filters, st) + val jsonFilters = if (SQLConf.get.jsonFilterPushDown) { + new JsonFilters(filters, st) + } else { + new NoopFilters + } (parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) { case START_OBJECT => convertObject(parser, st, fieldConverters, jsonFilters) // SPARK-3308: support reading top level JSON arrays and take every element diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala index 09022bfc15403..d6adbe83584e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonFilters.scala @@ -89,39 +89,37 @@ class JsonFilters(pushedFilters: Seq[sources.Filter], schema: StructType) // 1: Array(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) private val predicates: Array[Array[JsonPredicate]] = { val groupedPredicates = Array.fill(schema.length)(Array.empty[JsonPredicate]) - if (SQLConf.get.jsonFilterPushDown) { - val groupedByRefSet: Map[Set[String], JsonPredicate] = filters - // Group filters that have the same set of references. For example: - // IsNotNull("i") -> Set("i"), AlwaysTrue -> Set(), - // Or(EqualTo("i", 0), StringStartsWith("s", "abc")) -> Set("i", "s") - // By grouping filters we could avoid tracking their state of references in the - // current row separately. - .groupBy(_.references.toSet) - // Combine all filters from the same group by `And` because all filters should - // return `true` to do not skip a row. The result is compiled to a predicate. - .map { case (refSet, refsFilters) => - (refSet, JsonPredicate(toPredicate(refsFilters), refSet.size)) - } - // Apply predicates w/o references like `AlwaysTrue` and `AlwaysFalse` to all fields. - // We cannot set such predicates to a particular position because skipRow() can - // be invoked for any index due to unpredictable order of JSON fields in JSON records. - val withLiterals: Map[Set[String], JsonPredicate] = groupedByRefSet.map { - case (refSet, pred) if refSet.isEmpty => - (schema.fields.map(_.name).toSet, pred.copy(totalRefs = 1)) - case others => others - } - // Build a map where key is only one field and value is seq of predicates refer to the field - // "i" -> Seq(AlwaysTrue, IsNotNull("i"), Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) - // "s" -> Seq(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) - val groupedByFields: Map[String, Seq[(String, JsonPredicate)]] = withLiterals.toSeq - .flatMap { case (refSet, pred) => refSet.map((_, pred)) } - .groupBy(_._1) - // Build the final array by converting keys of `groupedByFields` to their - // indexes in the provided schema. - groupedByFields.foreach { case (fieldName, fieldPredicates) => - val fieldIndex = schema.fieldIndex(fieldName) - groupedPredicates(fieldIndex) = fieldPredicates.map(_._2).toArray + val groupedByRefSet: Map[Set[String], JsonPredicate] = filters + // Group filters that have the same set of references. For example: + // IsNotNull("i") -> Set("i"), AlwaysTrue -> Set(), + // Or(EqualTo("i", 0), StringStartsWith("s", "abc")) -> Set("i", "s") + // By grouping filters we could avoid tracking their state of references in the + // current row separately. + .groupBy(_.references.toSet) + // Combine all filters from the same group by `And` because all filters should + // return `true` to do not skip a row. The result is compiled to a predicate. + .map { case (refSet, refsFilters) => + (refSet, JsonPredicate(toPredicate(refsFilters), refSet.size)) } + // Apply predicates w/o references like `AlwaysTrue` and `AlwaysFalse` to all fields. + // We cannot set such predicates to a particular position because skipRow() can + // be invoked for any index due to unpredictable order of JSON fields in JSON records. + val withLiterals: Map[Set[String], JsonPredicate] = groupedByRefSet.map { + case (refSet, pred) if refSet.isEmpty => + (schema.fields.map(_.name).toSet, pred.copy(totalRefs = 1)) + case others => others + } + // Build a map where key is only one field and value is seq of predicates refer to the field + // "i" -> Seq(AlwaysTrue, IsNotNull("i"), Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) + // "s" -> Seq(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) + val groupedByFields: Map[String, Seq[(String, JsonPredicate)]] = withLiterals.toSeq + .flatMap { case (refSet, pred) => refSet.map((_, pred)) } + .groupBy(_._1) + // Build the final array by converting keys of `groupedByFields` to their + // indexes in the provided schema. + groupedByFields.foreach { case (fieldName, fieldPredicates) => + val fieldIndex = schema.fieldIndex(fieldName) + groupedPredicates(fieldIndex) = fieldPredicates.map(_._2).toArray } groupedPredicates } @@ -139,6 +137,9 @@ class JsonFilters(pushedFilters: Seq[sources.Filter], schema: StructType) * return `false`. It returns `false` if all predicates return `true`. */ def skipRow(row: InternalRow, index: Int): Boolean = { + assert(0 <= index && index < schema.fields.length, + s"The index $index is out of the valid range [0, ${schema.fields.length}). " + + s"It must point out to a field of the schema: ${schema.catalogString}.") var skip = false for (pred <- predicates(index) if !skip) { pred.refCount -= 1