Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ class JacksonParser(
private def makeStructRootConverter(st: StructType): JsonParser => Iterable[InternalRow] = {
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
val jsonFilters = new JsonFilters(filters, st)
val jsonFilters = if (SQLConf.get.jsonFilterPushDown) {
new JsonFilters(filters, st)
} else {
new NoopFilters
}
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) {
case START_OBJECT => convertObject(parser, st, fieldConverters, jsonFilters)
// SPARK-3308: support reading top level JSON arrays and take every element
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,39 +89,37 @@ class JsonFilters(pushedFilters: Seq[sources.Filter], schema: StructType)
// 1: Array(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc")))
private val predicates: Array[Array[JsonPredicate]] = {
val groupedPredicates = Array.fill(schema.length)(Array.empty[JsonPredicate])
if (SQLConf.get.jsonFilterPushDown) {
val groupedByRefSet: Map[Set[String], JsonPredicate] = filters
// Group filters that have the same set of references. For example:
// IsNotNull("i") -> Set("i"), AlwaysTrue -> Set(),
// Or(EqualTo("i", 0), StringStartsWith("s", "abc")) -> Set("i", "s")
// By grouping filters we could avoid tracking their state of references in the
// current row separately.
.groupBy(_.references.toSet)
// Combine all filters from the same group by `And` because all filters should
// return `true` to do not skip a row. The result is compiled to a predicate.
.map { case (refSet, refsFilters) =>
(refSet, JsonPredicate(toPredicate(refsFilters), refSet.size))
}
// Apply predicates w/o references like `AlwaysTrue` and `AlwaysFalse` to all fields.
// We cannot set such predicates to a particular position because skipRow() can
// be invoked for any index due to unpredictable order of JSON fields in JSON records.
val withLiterals: Map[Set[String], JsonPredicate] = groupedByRefSet.map {
case (refSet, pred) if refSet.isEmpty =>
(schema.fields.map(_.name).toSet, pred.copy(totalRefs = 1))
case others => others
}
// Build a map where key is only one field and value is seq of predicates refer to the field
// "i" -> Seq(AlwaysTrue, IsNotNull("i"), Or(EqualTo("i", 0), StringStartsWith("s", "abc")))
// "s" -> Seq(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc")))
val groupedByFields: Map[String, Seq[(String, JsonPredicate)]] = withLiterals.toSeq
.flatMap { case (refSet, pred) => refSet.map((_, pred)) }
.groupBy(_._1)
// Build the final array by converting keys of `groupedByFields` to their
// indexes in the provided schema.
groupedByFields.foreach { case (fieldName, fieldPredicates) =>
val fieldIndex = schema.fieldIndex(fieldName)
groupedPredicates(fieldIndex) = fieldPredicates.map(_._2).toArray
val groupedByRefSet: Map[Set[String], JsonPredicate] = filters
// Group filters that have the same set of references. For example:
// IsNotNull("i") -> Set("i"), AlwaysTrue -> Set(),
// Or(EqualTo("i", 0), StringStartsWith("s", "abc")) -> Set("i", "s")
// By grouping filters we could avoid tracking their state of references in the
// current row separately.
.groupBy(_.references.toSet)
// Combine all filters from the same group by `And` because all filters should
// return `true` to do not skip a row. The result is compiled to a predicate.
.map { case (refSet, refsFilters) =>
(refSet, JsonPredicate(toPredicate(refsFilters), refSet.size))
}
// Apply predicates w/o references like `AlwaysTrue` and `AlwaysFalse` to all fields.
// We cannot set such predicates to a particular position because skipRow() can
// be invoked for any index due to unpredictable order of JSON fields in JSON records.
val withLiterals: Map[Set[String], JsonPredicate] = groupedByRefSet.map {
case (refSet, pred) if refSet.isEmpty =>
(schema.fields.map(_.name).toSet, pred.copy(totalRefs = 1))
case others => others
}
// Build a map where key is only one field and value is seq of predicates refer to the field
// "i" -> Seq(AlwaysTrue, IsNotNull("i"), Or(EqualTo("i", 0), StringStartsWith("s", "abc")))
// "s" -> Seq(AlwaysTrue, Or(EqualTo("i", 0), StringStartsWith("s", "abc")))
val groupedByFields: Map[String, Seq[(String, JsonPredicate)]] = withLiterals.toSeq
.flatMap { case (refSet, pred) => refSet.map((_, pred)) }
.groupBy(_._1)
// Build the final array by converting keys of `groupedByFields` to their
// indexes in the provided schema.
groupedByFields.foreach { case (fieldName, fieldPredicates) =>
val fieldIndex = schema.fieldIndex(fieldName)
groupedPredicates(fieldIndex) = fieldPredicates.map(_._2).toArray
}
groupedPredicates
}
Expand All @@ -139,6 +137,9 @@ class JsonFilters(pushedFilters: Seq[sources.Filter], schema: StructType)
* return `false`. It returns `false` if all predicates return `true`.
*/
def skipRow(row: InternalRow, index: Int): Boolean = {
assert(0 <= index && index < schema.fields.length,
s"The index $index is out of the valid range [0, ${schema.fields.length}). " +
s"It must point out to a field of the schema: ${schema.catalogString}.")
var skip = false
for (pred <- predicates(index) if !skip) {
pred.refCount -= 1
Expand Down