Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,34 @@ private[sql] object ParquetFilters {
override def inverseCanDrop(statistics: Statistics[T]): Boolean = false
}

object StringFilter extends Enumeration {
type Mode = Value
val STARTS_WITH, ENDS_WITH, CONTAINS = Value
}

case class StringFilter(
v: java.lang.String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala String is just a type alias of java.lang.String, so we don't need to be explicit here.

mode: StringFilter.Mode) extends UserDefinedPredicate[Binary] with Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to extend from Serializable explicitly since case classes are inherently serializable.


private val compare = mode match {
case StringFilter.STARTS_WITH =>
(x: java.lang.String) => x.startsWith(v)
case StringFilter.ENDS_WITH =>
(x: java.lang.String) => x.endsWith(v)
case StringFilter.CONTAINS =>
(x: java.lang.String) => x.contains(v)
}

override def keep(value: Binary): Boolean = {
val str = value.toStringUsingUTF8()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove ()

compare(str)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using an enum to dispatch the comparator function, I'd prefer to use simple inheritance here, namely:

abstract class StringFilter extends UserDefinedPredicate[Binary] {
  override def canDrop(statistics: Statistics[Binary]): Boolean = false 
  override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false 
}

case class StringStartsWithFilter(prefix: String) extends StringFilter {
  override def keep(value: Binary): Boolean = value.toStringUsingUTF8.startsWith(value)
}

case class StringEndsWithFilter(prefix: String) extends StringFilter {
  override def keep(value: Binary): Boolean = value.toStringUsingUTF8.endsWith(value)
}

case class StringContainsFilter(prefix: String) extends StringFilter {
  override def keep(value: Binary): Boolean = value.toStringUsingUTF8.contains(value)
}

The reasons are:

  1. keep is on a performance critical code path, an extra level of indirection hurts performance.
  2. StringStartsWithFilter.canDrop can leverage statistics information:
override def canDrop(statistics: Statistics[Binary]): Boolean = {
  val max = statistics.getMax.toStringUsingUTF8
  val min = statistics.getMin.toStringUsingUTF8
  max < prefix || !min.startsWith(prefix) && min > prefix
}

It would be easier to override canDrop in the above version.

However, we should NOT add the above canDrop for now, because parquet-mr 1.7.0 suffers from PARQUET-251, which may cause wrong query results because of corrupted binary statistics information. We may add it after upgrading parquet-mr to 1.8+ (hopefully 1.8.2 since we observed slight performance regression in 1.8.1).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing that concerns me is performance. The keep method is called over every single string value, while Binary.toStringUsingUTF8 can be pretty slow. I think we can apply the same trick used in CatalystStringConverter and use a mutable UTF8String here to do all the string operations.


override def canDrop(statistics: Statistics[Binary]): Boolean = false

override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = false
}

private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
Expand Down Expand Up @@ -164,7 +192,18 @@ private[sql] object ParquetFilters {
FilterApi.gtEq(binaryColumn(n), Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
}

private val makeStringFilter: PartialFunction[DataType,
(String, String, StringFilter.Mode) => FilterPredicate] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For types as complex as this partial function one, you may want to define a type alias for better readability. But as mentioned in my comment above, we probably need specialized filter makers for each string UDP here.

case StringType =>
(n: String, v: String, mode: StringFilter.Mode) =>
FilterApi.userDefined(binaryColumn(n),
StringFilter(v.asInstanceOf[java.lang.String], mode))
}

private val makeInSet: PartialFunction[DataType, (String, Set[Any]) => FilterPredicate] = {
case BooleanType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(booleanColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Boolean]]))
case IntegerType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(intColumn(n), SetInFilter(v.asInstanceOf[Set[java.lang.Integer]]))
Expand Down Expand Up @@ -216,6 +255,9 @@ private[sql] object ParquetFilters {
case sources.IsNotNull(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, null))

case sources.In(name, values) =>
makeInSet.lift(dataTypeOf(name)).map(_(name, values.toSet))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this back :) I think this is worth a separate JIRA ticket to track. Would you mind to open one and add the ID to the PR title?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will open another ticket.

case sources.EqualTo(name, value) =>
makeEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.Not(sources.EqualTo(name, value)) =>
Expand All @@ -236,6 +278,13 @@ private[sql] object ParquetFilters {
case sources.GreaterThanOrEqual(name, value) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))

case sources.StringStartsWith(name, value) =>
makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.STARTS_WITH))
case sources.StringEndsWith(name, value) =>
makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.ENDS_WITH))
case sources.StringContains(name, value) =>
makeStringFilter.lift(dataTypeOf(name)).map(_(name, value, StringFilter.CONTAINS))

case sources.And(lhs, rhs) =>
(createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true)
checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)

checkFilterPredicate(('_1 in(true)).asInstanceOf[Predicate],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: ('_1 in(true)) looks a little bit weird, maybe '_1.in(true)? This also applies to all the in predicates below.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I noticed that In, StartsWith, EndsWith, Contains, RLike and Like are all Expression but not Predicate. I don't see a good reason for this, and we can probably make all of them Predicate in another PR, but I may miss something here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nit, usually we prefer the following styles if we have to wrap method arguments:

checkFilterPredicate(
  '_1.in(true).asInstanceOf[Predicate],
  classOf[UserDefinedByInstance[_, _]],
  true)

or

checkFilterPredicate(
  '_1.in(true).asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], true)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is reasonable. I will try to make them as Predicate in another PR later.

classOf[UserDefinedByInstance[_, _]], true)
checkFilterPredicate(('_1 in(false)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], false)
}
}

Expand All @@ -138,6 +143,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))

checkFilterPredicate(('_1 in(1, 2)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(1), Row(2)))
checkFilterPredicate(('_1 in(3, 4)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(3), Row(4)))
}
}

Expand All @@ -164,6 +174,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))

checkFilterPredicate(('_1 in(1L, 2L)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(1L), Row(2L)))
checkFilterPredicate(('_1 in(3L, 4L)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(3L), Row(4L)))
}
}

Expand All @@ -190,6 +205,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))

checkFilterPredicate(('_1 in(1.0f, 2.0f)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0f), Row(2.0f)))
checkFilterPredicate(('_1 in(3.0f, 4.0f)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0f), Row(4.0f)))
}
}

Expand All @@ -216,6 +236,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))

checkFilterPredicate(('_1 in(1.0, 2.0)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(1.0), Row(2.0)))
checkFilterPredicate(('_1 in(3.0, 4.0)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(3.0), Row(4.0)))
}
}

Expand Down Expand Up @@ -244,6 +269,31 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex

checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))

checkFilterPredicate(('_1 in("1", "2")).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row("1"), Row("2")))
checkFilterPredicate(('_1 in("3", "4")).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row("3"), Row("4")))
}

withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString * 5 + "test"))) { implicit df =>
checkFilterPredicate(('_1 contains "11").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], "11111test")

checkFilterPredicate(('_1 contains "2test").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], "22222test")

checkFilterPredicate(('_1 contains "3t").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], "33333test")

checkFilterPredicate(('_1 startsWith "22").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], "22222test")

checkFilterPredicate(('_1 endsWith "4test").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], "44444test")

checkFilterPredicate(('_1 endsWith "2test").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], "22222test")
}
}

Expand Down Expand Up @@ -278,6 +328,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
checkBinaryFilterPredicate(
'_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))

checkFilterPredicate(('_1 in(1.b, 2.b)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(1.b), Row(2.b)))
checkFilterPredicate(('_1 in(3.b, 4.b)).asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]], Seq(Row(3.b), Row(4.b)))
}
}

Expand Down