Skip to content

Commit 601d242

Browse files
committed
Merge pull request #13 from zhzhan/orc1
predictor pushdown support
2 parents 1e0c1d9 + c5236ef commit 601d242

File tree

5 files changed

+178
-15
lines changed

5 files changed

+178
-15
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import org.apache.hadoop.conf.Configuration
2021
import org.apache.hadoop.hive.ql.parse.ASTNode
22+
import org.apache.hadoop.mapreduce.Job
2123

2224
import org.apache.spark.annotation.Experimental
2325
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.types.StringType
3032
import org.apache.spark.sql.execution.{DescribeCommand, OutputFaker, SparkPlan}
3133
import org.apache.spark.sql.hive
3234
import org.apache.spark.sql.hive.execution._
33-
import org.apache.spark.sql.hive.orc.{WriteToOrcFile, InsertIntoOrcTable, OrcRelation, OrcTableScan}
35+
import org.apache.spark.sql.hive.orc._
3436
import org.apache.spark.sql.parquet.ParquetRelation
3537
import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}
3638

@@ -243,8 +245,36 @@ private[hive] trait HiveStrategies {
243245
case logical.InsertIntoTable(table: OrcRelation, partition, child, overwrite) =>
244246
InsertIntoOrcTable(table, planLater(child), overwrite) :: Nil
245247
case PhysicalOperation(projectList, filters, relation: OrcRelation) =>
246-
// TODO: need to implement predict push down.
247-
val prunePushedDownFilters = identity[Seq[Expression]] _
248+
val prunePushedDownFilters = {
249+
OrcRelation.jobConf = sparkContext.hadoopConfiguration
250+
if (ORC_FILTER_PUSHDOWN_ENABLED) {
251+
val job = new Job(OrcRelation.jobConf)
252+
val conf: Configuration = job.getConfiguration
253+
logInfo("Orc push down filter enabled:" + filters)
254+
(filters: Seq[Expression]) => {
255+
val recordFilter = OrcFilters.createFilter(filters)
256+
if (recordFilter.isDefined) {
257+
258+
logInfo("Parsed filters:" + recordFilter)
259+
/**
260+
* To test it, we can set follows so that the reader
261+
* will not read whole file if small
262+
* sparkContext.hadoopConfiguration.setInt(
263+
* "mapreduce.input.fileinputformat.split.maxsize", 50)
264+
*/
265+
conf.set(SARG_PUSHDOWN, toKryo(recordFilter.get))
266+
conf.setBoolean("hive.optimize.index.filter", true)
267+
OrcRelation.jobConf = conf
268+
}
269+
// no matter whether it is filtered or not in orc,
270+
// we need to do more fine grained filter
271+
// in the upper layer, return all of them
272+
filters
273+
}
274+
} else {
275+
identity[Seq[Expression]] _
276+
}
277+
}
248278
pruneFilterProject(
249279
projectList,
250280
filters,
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.orc
19+
20+
import org.apache.spark.sql.catalyst.expressions._
21+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument
22+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
23+
import org.apache.spark.Logging
24+
25+
private[sql] object OrcFilters extends Logging {
26+
27+
def createFilter(expr: Seq[Expression]): Option[SearchArgument] = {
28+
if (expr == null || expr.size == 0) return None
29+
var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder())
30+
sarg.get.startAnd()
31+
expr.foreach {
32+
x => {
33+
sarg match {
34+
case Some(s1) => sarg = createFilter(x, s1)
35+
case _ => None
36+
}
37+
}
38+
}
39+
sarg match {
40+
case Some(b) => Some(b.end.build)
41+
case _ => None
42+
}
43+
}
44+
45+
def createFilter(expression: Expression, builder: Builder): Option[Builder] = {
46+
expression match {
47+
case p@And(left: Expression, right: Expression) => {
48+
val b1 = builder.startAnd()
49+
val b2 = createFilter(left, b1)
50+
b2 match {
51+
case Some(b) => val b3 = createFilter(right, b)
52+
if (b3.isDefined) {
53+
Some(b3.get.end)
54+
} else {
55+
None
56+
}
57+
case _ => None
58+
}
59+
}
60+
case p@Or(left: Expression, right: Expression) => {
61+
val b1 = builder.startOr()
62+
val b2 = createFilter(left, b1)
63+
b2 match {
64+
case Some(b) => val b3 = createFilter(right, b)
65+
if (b3.isDefined) {
66+
Some(b3.get.end)
67+
} else {
68+
None
69+
}
70+
case _ => None
71+
}
72+
}
73+
case p@EqualTo(left: Literal, right: NamedExpression) => {
74+
val b1 = builder.equals(right.name, left.value)
75+
Some(b1)
76+
}
77+
case p@EqualTo(left: NamedExpression, right: Literal) => {
78+
val b1 = builder.equals(left.name, right.value)
79+
Some(b1)
80+
}
81+
case p@LessThan(left: NamedExpression, right: Literal) => {
82+
val b1 = builder.lessThan(left.name, right.value)
83+
Some(b1)
84+
}
85+
case p@LessThan(left: Literal, right: NamedExpression) => {
86+
val b1 = builder.startNot().lessThanEquals(right.name, left.value).end()
87+
Some(b1)
88+
}
89+
case p@LessThanOrEqual(left: NamedExpression, right: Literal) => {
90+
val b1 = builder.lessThanEquals(left.name, right.value)
91+
Some(b1)
92+
}
93+
case p@LessThanOrEqual(left: Literal, right: NamedExpression) => {
94+
val b1 = builder.startNot().lessThan(right.name, left.value).end()
95+
Some(b1)
96+
}
97+
case p@GreaterThan(left: NamedExpression, right: Literal) => {
98+
val b1 = builder.startNot().lessThanEquals(left.name, right.value).end()
99+
Some(b1)
100+
}
101+
case p@GreaterThan(left: Literal, right: NamedExpression) => {
102+
val b1 = builder.lessThanEquals(right.name, left.value)
103+
Some(b1)
104+
}
105+
case p@GreaterThanOrEqual(left: NamedExpression, right: Literal) => {
106+
val b1 = builder.startNot().lessThan(left.name, right.value).end()
107+
Some(b1)
108+
}
109+
case p@GreaterThanOrEqual(left: Literal, right: NamedExpression) => {
110+
val b1 = builder.lessThan(right.name, left.value)
111+
Some(b1)
112+
}
113+
// TODO: test it
114+
case p@EqualNullSafe(left: NamedExpression, right: NamedExpression) => {
115+
val b1 = builder.nullSafeEquals(left.name, right.name)
116+
Some(b1)
117+
}
118+
case p@In(left: NamedExpression, list: Seq[Literal]) => {
119+
val b1 = builder.in(left.name, list.map(_.value).toArray)
120+
Some(b1)
121+
}
122+
case _ => None
123+
}
124+
}
125+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ private[sql] object OrcRelation {
9494
}
9595
path
9696
}
97+
var jobConf: Configuration = _
9798
}
9899

99100
private[sql] object OrcFileOperator{

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,7 @@ case class OrcTableScan(
7777

7878
override def execute(): RDD[Row] = {
7979
val sc = sqlContext.sparkContext
80-
val job = new Job(sc.hadoopConfiguration)
81-
80+
val job = new Job(OrcRelation.jobConf) // sc.hadoopConfiguration)
8281
val conf: Configuration = job.getConfiguration
8382
relation.path.split(",").foreach { curPath =>
8483
val qualifiedPath = {
@@ -110,17 +109,15 @@ case class OrcTableScan(
110109
* @param conf
111110
*/
112111
def addColumnIds(output: Seq[Attribute], relation: OrcRelation, conf: Configuration) {
113-
val fieldIdMap = relation.output.map(_.name).zipWithIndex.toMap
114-
val names = output.map(_.name)
115-
val ids = output.map { att =>
116-
val realName = att.name.toLowerCase(Locale.ENGLISH)
117-
fieldIdMap.getOrElse(realName, -1)
118-
}.filter(_ >= 0).map(_.asInstanceOf[Integer])
119112

113+
val ids =
114+
output.map(a =>
115+
relation.output.indexWhere(_.name == a.name): Integer)
116+
.filter(_ >= 0)
117+
val names = output.map(_.name)
120118
assert(ids.size == names.size, "columns id and name length does not match!")
121-
if (ids != null && !ids.isEmpty) {
122-
HiveShim.appendReadColumns(conf, ids, names)
123-
}
119+
val sorted = ids.zip(names).sorted
120+
HiveShim.appendReadColumns(conf, sorted.map(_._1), sorted.map(_._2))
124121
}
125122
}
126123

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import com.esotericsoftware.kryo.Kryo
21+
import com.esotericsoftware.kryo.io.Output
22+
import org.apache.commons.codec.binary.Base64
2023
import org.apache.spark.sql.{SQLContext, SchemaRDD}
2124
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2225
import org.apache.hadoop.hive.serde2.objectinspector._
@@ -46,6 +49,13 @@ package object orc {
4649
// for orc compression type, only take effect in hive 0.13.1
4750
val orcDefaultCompressVar = "hive.exec.orc.default.compress"
4851
// for prediction push down in hive-0.13.1, don't enable it
49-
val ORC_FILTER_PUSHDOWN_ENABLED = false
52+
val ORC_FILTER_PUSHDOWN_ENABLED = true
5053
val SARG_PUSHDOWN = "sarg.pushdown"
54+
55+
def toKryo(input: Any) = {
56+
val out = new Output(4 * 1024, 10 * 1024 * 1024);
57+
new Kryo().writeObject(out, input);
58+
out.close();
59+
Base64.encodeBase64String(out.toBytes());
60+
}
5161
}

0 commit comments

Comments
 (0)