Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ import org.apache.spark.sql.sources._
private[orc] object OrcFilters extends Logging {
def createFilter(expr: Array[Filter]): Option[SearchArgument] = {
expr.reduceOption(And).flatMap { conjunction =>
val builder = SearchArgumentFactory.newBuilder()
buildSearchArgument(conjunction, builder).map(_.build())
val builder = SearchArgumentFactory.newBuilder().startAnd()
buildSearchArgument(conjunction, builder).map(_.end().build())
}
}

private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = {
def newBuilder = SearchArgumentFactory.newBuilder()
def newBuilder = SearchArgumentFactory.newBuilder().startAnd()

def isSearchableLiteral(value: Any): Boolean = value match {
// These are types recognized by the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
Expand Down Expand Up @@ -72,8 +72,8 @@ private[orc] object OrcFilters extends Logging {

expression match {
case And(left, right) =>
val tryLeft = buildSearchArgument(left, newBuilder)
val tryRight = buildSearchArgument(right, newBuilder)
val tryLeft = buildSearchArgument(left, newBuilder).map(_.end)
val tryRight = buildSearchArgument(right, newBuilder).map(_.end)

val conjunction = for {
_ <- tryLeft
Expand All @@ -90,15 +90,15 @@ private[orc] object OrcFilters extends Logging {

case Or(left, right) =>
for {
_ <- buildSearchArgument(left, newBuilder)
_ <- buildSearchArgument(right, newBuilder)
_ <- buildSearchArgument(left, newBuilder).map(_.end)
_ <- buildSearchArgument(right, newBuilder).map(_.end)
lhs <- buildSearchArgument(left, builder.startOr())
rhs <- buildSearchArgument(right, lhs)
} yield rhs.end()

case Not(child) =>
for {
_ <- buildSearchArgument(child, newBuilder)
_ <- buildSearchArgument(child, newBuilder).map(_.end())
negate <- buildSearchArgument(child, builder.startNot())
} yield negate.end()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("Simple selection form ORC table") {
def simpleSelection(): Unit = {
val data = (1 to 10).map { i =>
Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", s"phone_$m") })
}
Expand Down Expand Up @@ -147,6 +147,16 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}

test("Simple selection form ORC table") {
simpleSelection()
}

test("Simple selection form ORC table with PPD") {
withPPD {
simpleSelection
}
}

test("save and load case class RDD with `None`s as orc") {
val data = (
None: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton {
}
}

protected def withPPD(f: () => Unit) {
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
f()
sqlContext.setConf("spark.sql.orc.filterPushdown", "false")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use SQLTestUtils.withSQLConf instead.


/**
* Writes `data` to a Orc file and reads it back as a [[DataFrame]],
* which is then passed to `f`. The Orc file will be deleted after `f` returns.
Expand Down