Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,69 +444,81 @@ object DataSourceStrategy {
}
}

private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match {
case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), a: Attribute) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))

case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))

case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThan(a.name, convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
Some(sources.LessThan(a.name, convertToScala(v, t)))

case expressions.LessThan(a: Attribute, Literal(v, t)) =>
Some(sources.LessThan(a.name, convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThan(a.name, convertToScala(v, t)))

case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))

case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))

case expressions.InSet(a: Attribute, set) =>
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, set.toArray.map(toScala)))

// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) =>
val hSet = list.map(_.eval(EmptyRow))
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, hSet.toArray.map(toScala)))

case expressions.IsNull(a: Attribute) =>
Some(sources.IsNull(a.name))
case expressions.IsNotNull(a: Attribute) =>
Some(sources.IsNotNull(a.name))
case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(a.name, v.toString))

case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringEndsWith(a.name, v.toString))

case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringContains(a.name, v.toString))

case expressions.Literal(true, BooleanType) =>
Some(sources.AlwaysTrue)

case expressions.Literal(false, BooleanType) =>
Some(sources.AlwaysFalse)

case _ => None
private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = {
// Recursively try to find an attribute name from the top level that can be pushed down.
def attrName(e: Expression): Option[String] = e match {
case a: Attribute if a.dataType != StructType =>
Some(a.name)
case s: GetStructField if s.childSchema(s.ordinal).dataType != StructType =>
attrName(s.child).map(_ + s".${s.childSchema(s.ordinal).name}")
case _ =>
None
}

predicate match {
case expressions.EqualTo(e: Expression, Literal(v, t)) =>
attrName(e).map(name => sources.EqualTo(name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), e: Expression) =>
attrName(e).map(name => sources.EqualTo(name, convertToScala(v, t)))

case expressions.EqualNullSafe(e: Expression, Literal(v, t)) =>
attrName(e).map(name => sources.EqualNullSafe(name, convertToScala(v, t)))
case expressions.EqualNullSafe(Literal(v, t), e: Expression) =>
attrName(e).map(name => sources.EqualNullSafe(name, convertToScala(v, t)))

case expressions.GreaterThan(e: Expression, Literal(v, t)) =>
attrName(e).map(name => sources.GreaterThan(name, convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), e: Expression) =>
attrName(e).map(name => sources.LessThan(name, convertToScala(v, t)))

case expressions.LessThan(e: Expression, Literal(v, t)) =>
attrName(e).map(name => sources.LessThan(name, convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), e: Expression) =>
attrName(e).map(name => sources.GreaterThan(name, convertToScala(v, t)))

case expressions.GreaterThanOrEqual(e: Expression, Literal(v, t)) =>
attrName(e).map(name => sources.GreaterThanOrEqual(name, convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), e: Expression) =>
attrName(e).map(name => sources.LessThanOrEqual(name, convertToScala(v, t)))

case expressions.LessThanOrEqual(e: Expression, Literal(v, t)) =>
attrName(e).map(name => sources.LessThanOrEqual(name, convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), e: Expression) =>
attrName(e).map(name => sources.GreaterThanOrEqual(name, convertToScala(v, t)))

case expressions.InSet(e: Expression, set) =>
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
attrName(e).map(name => sources.In(name, set.toArray.map(toScala)))

// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
case expressions.In(e: Expression, list) if list.forall(_.isInstanceOf[Literal]) =>
val hSet = list.map(_.eval(EmptyRow))
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
attrName(e).map(name => sources.In(name, hSet.toArray.map(toScala)))

case expressions.IsNull(e: Expression) =>
attrName(e).map(name => sources.IsNull(name))
case expressions.IsNotNull(e: Expression) =>
attrName(e).map(name => sources.IsNotNull(name))
case expressions.StartsWith(e: Expression, Literal(v: UTF8String, StringType)) =>
attrName(e).map(name => sources.StringStartsWith(name, v.toString))

case expressions.EndsWith(e: Expression, Literal(v: UTF8String, StringType)) =>
attrName(e).map(name => sources.StringEndsWith(name, v.toString))

case expressions.Contains(e: Expression, Literal(v: UTF8String, StringType)) =>
attrName(e).map(name => sources.StringContains(name, v.toString))

case expressions.Literal(true, BooleanType) =>
Some(sources.AlwaysTrue)

case expressions.Literal(false, BooleanType) =>
Some(sources.AlwaysFalse)

case _ => None
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.collection.JavaConverters.asScalaBufferConverter
import org.apache.parquet.filter2.predicate._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator}
import org.apache.parquet.schema._
import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
Expand All @@ -49,16 +49,47 @@ class ParquetFilters(
pushDownInFilterThreshold: Int,
caseSensitive: Boolean) {
// A map which contains parquet field name and data type, if predicate push down applies.
private val nameToParquetField : Map[String, ParquetField] = {
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
val primitiveFields =
schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
f.getName -> ParquetField(f.getName,
ParquetSchemaType(f.getOriginalType,
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
private val nameToParquetField : Map[String, ParquetPrimitiveField] = {
def canPushDownField(field: Type): Boolean = {
if (field.getName.contains(".")) {
// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
// See SPARK-20364.
false
} else {
field match {
case _: PrimitiveType => true
// Parquet only supports push-down for primitive types; as a result, Map and List types
// are filtered out. FYI, when g is a `Struct`, `g.getOriginalType` is `null`.
// When g is a `Map`, `g.getOriginalType` is `MAP`.
// When g is a `List`, `g.getOriginalType` is `LIST`.
case g: GroupType if g.getOriginalType == null => true
case _ => false
}
}
}

def getFieldMapHelper(
fields: Seq[Type],
baseName: Option[String] = None): Seq[(String, ParquetPrimitiveField)] = {
fields.filter(canPushDownField).flatMap { field =>
val name = baseName.map(_ + "." + field.getName).getOrElse(field.getName)
field match {
case p: PrimitiveType =>
val primitiveField = ParquetPrimitiveField(fieldName = name,
fieldType = ParquetSchemaType(p.getOriginalType,
p.getPrimitiveTypeName, p.getTypeLength, p.getDecimalMetadata))
Some((name, primitiveField))
case g: GroupType =>
getFieldMapHelper(g.getFields.asScala, Some(name))
}
}
}

val primitiveFields = getFieldMapHelper(schema.getFields.asScala)

if (caseSensitive) {
primitiveFields.toMap
} else {
Expand All @@ -74,13 +105,14 @@ class ParquetFilters(
}
}


/**
* Holds a single field information stored in the underlying parquet file.
*
* @param fieldName field name in parquet file
* @param fieldType field type related info in parquet file
*/
private case class ParquetField(
private case class ParquetPrimitiveField(
fieldName: String,
fieldType: ParquetSchemaType)

Expand Down Expand Up @@ -466,13 +498,8 @@ class ParquetFilters(
case _ => false
}

// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
// See SPARK-20364.
private def canMakeFilterOn(name: String, value: Any): Boolean = {
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
nameToParquetField.contains(name) && valueCanMakeFilterOn(name, value)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ case class OrcScanBuilder(
// changed `hadoopConf` in executors.
OrcInputFormat.setSearchArgument(hadoopConf, f, schema.fieldNames)
}
val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
_pushedFilters = OrcFilters.convertibleFilters(schema, dataTypeMap, filters).toArray
_pushedFilters = OrcFilters.convertibleFilters(schema, filters).toArray
}
filters
}
Expand Down
Loading