Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ private[sql] object OrcFilters {
}
}

// Since ORC 1.5.0 (ORC-323), we need to quote for column names with `.` characters
// in order to distinguish predicate pushdown for nested columns.
private def quoteAttributeNameIfNeeded(name: String) : String = {
if (!name.contains("`") && name.contains(".")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this condition take the backtick in column name into account? For instance,

>>> spark.range(1).toDF("abc`.abc").show()
+--------+
|abc`.abc|
+--------+
|       0|
+--------+

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review. I'll consider that, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon . Actually, Spark 2.3.2 ORC (native/hive) doesn't support a backtick character in column names. It fails on writing operation. And, although Spark 2.4.0 broadens the supported special characters like . and " in column names, the backtick character is not handled yet.

So, for that one, I'll proceed in another PR since it's an improvement instead of a regression.

Also, cc @gatorsmile and @dbtsai .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ORC and AVRO improvement, SPARK-25722 is created.

s"`$name`"
} else {
name
}
}

/**
* Create ORC filter as a SearchArgument instance.
*/
Expand Down Expand Up @@ -178,38 +188,47 @@ private[sql] object OrcFilters {
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).

case EqualTo(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().equals(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().equals(quotedName, getType(attribute), castedValue).end())

case EqualNullSafe(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().nullSafeEquals(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().nullSafeEquals(quotedName, getType(attribute), castedValue).end())

case LessThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThan(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().lessThan(quotedName, getType(attribute), castedValue).end())

case LessThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startAnd().lessThanEquals(attribute, getType(attribute), castedValue).end())
Some(builder.startAnd().lessThanEquals(quotedName, getType(attribute), castedValue).end())

case GreaterThan(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThanEquals(attribute, getType(attribute), castedValue).end())
Some(builder.startNot().lessThanEquals(quotedName, getType(attribute), castedValue).end())

case GreaterThanOrEqual(attribute, value) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValue = castLiteralValue(value, dataTypeMap(attribute))
Some(builder.startNot().lessThan(attribute, getType(attribute), castedValue).end())
Some(builder.startNot().lessThan(quotedName, getType(attribute), castedValue).end())

case IsNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startAnd().isNull(attribute, getType(attribute)).end())
val quotedName = quoteAttributeNameIfNeeded(attribute)
Some(builder.startAnd().isNull(quotedName, getType(attribute)).end())

case IsNotNull(attribute) if isSearchableType(dataTypeMap(attribute)) =>
Some(builder.startNot().isNull(attribute, getType(attribute)).end())
val quotedName = quoteAttributeNameIfNeeded(attribute)
Some(builder.startNot().isNull(quotedName, getType(attribute)).end())

case In(attribute, values) if isSearchableType(dataTypeMap(attribute)) =>
val quotedName = quoteAttributeNameIfNeeded(attribute)
val castedValues = values.map(v => castLiteralValue(v, dataTypeMap(attribute)))
Some(builder.startAnd().in(attribute, getType(attribute),
Some(builder.startAnd().in(quotedName, getType(attribute),
castedValues.map(_.asInstanceOf[AnyRef]): _*).end())

case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@

package org.apache.spark.sql.execution.datasources.orc

import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.{Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -383,4 +385,17 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext {
)).get.toString
}
}

test("SPARK-25579 ORC PPD should support column names with dot") {
import testImplicits._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test at OrcFilterSuite too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, this is OrcFilterSuite.

Can we add a test at OrcFilterSuite too?

For HiveOrcFilterSuite, hive ORC implementation doesn't support dot.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. One end to end test should be enough


withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempDir { dir =>
val path = new File(dir, "orc").getCanonicalPath
Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about explicitly repartition to make separate output files?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using the default parallelism from TestSparkSession on two rows and it generates separate output files already.

If you are concerning some possibility of flakiness, we are able to increase the number of rows to 10 and call repartition(10) and check assert(actual < 10) as you did before. Do you want that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not rely on implicit environment values, let's make the test as explicit as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Thank you for confirmation, @cloud-fan and @HyukjinKwon .

val df = spark.read.orc(path).where("`col.dot.1` = 1 and `col.dot.2` = 2")
checkAnswer(stripSparkFilter(df), Row(1, 2))
Copy link
Contributor

@cloud-fan cloud-fan Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to confirm, this only works when (1, 2) and (3, 4) are in different row groups? (not sure what's the terminology in ORC)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It works when they are in different stripes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we generalize this into nested cases? The parent struct can contain dot as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ORC data source doesn't support nested column pruning yet.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @dbtsai ! I ignored PPDs with nested columns here because Spark doesn't pushdown in Spark 2.4 and until now without your PR (#22573). With your PR, Spark 3.0 will support that and we can update this to handle that cases, too.

@cloud-fan . Actually, ORC 1.5.0 starts to support PPD with nested columns ORC-323. So, @dbtsai and I discussed about supporting that before. We are going to support ORC PPDs with nested columns in Spark 3.0 without regression.

}
}
}
}