Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,16 @@ abstract class FileTable(
val partitionSchema = fileIndex.partitionSchema
SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames,
"in the partition schema", caseSensitive)
PartitioningUtils.mergeDataAndPartitionSchema(dataSchema,
partitionSchema, caseSensitive)._1
val partitionNameSet: Set[String] =
partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet

// When data and partition schemas have overlapping columns,
// tableSchema = dataSchema - overlapSchema + partitionSchema
val fields = dataSchema.fields.filterNot { field =>
val colName = PartitioningUtils.getColName(field, caseSensitive)
partitionNameSet.contains(colName)
} ++ partitionSchema.fields
StructType(fields)
}

override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
}

test("Return correct results when data columns overlap with partition columns") {
Seq("parquet", "orc", "json").foreach { format =>
withTempPath { path =>
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
Seq((1, 2, 3, 4, 5)).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
.write.format(format).save(tablePath.getCanonicalPath)

val df = spark.read.format(format).load(path.getCanonicalPath)
.select("CoL1", "Col2", "CoL5", "CoL3")
checkAnswer(df, Row("a", 2, "e", "c"))
}
}
}
}

object TestingUDT {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,68 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}

test("read partitioned table - with nulls") {
withTempDir { base =>
for {
// Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
pi <- Seq(1, null.asInstanceOf[Integer])
ps <- Seq("foo", null.asInstanceOf[String])
} {
makeOrcFile(
(1 to 10).map(i => OrcParData(i, i.toString)),
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

spark.read
.option("hive.exec.default.partition.name", defaultPartitionName)
.orc(base.getCanonicalPath)
.createOrReplaceTempView("t")

withTempTable("t") {
checkAnswer(
sql("SELECT * FROM t"),
for {
i <- 1 to 10
pi <- Seq(1, null.asInstanceOf[Integer])
ps <- Seq("foo", null.asInstanceOf[String])
} yield Row(i, i.toString, pi, ps))

checkAnswer(
sql("SELECT * FROM t WHERE pi IS NULL"),
for {
i <- 1 to 10
ps <- Seq("foo", null.asInstanceOf[String])
} yield Row(i, i.toString, null, ps))

checkAnswer(
sql("SELECT * FROM t WHERE ps IS NULL"),
for {
i <- 1 to 10
pi <- Seq(1, null.asInstanceOf[Integer])
} yield Row(i, i.toString, pi, null))
}
}
}

test("SPARK-27162: handle pathfilter configuration correctly") {
withTempPath { dir =>
val path = dir.getCanonicalPath

val df = spark.range(2)
df.write.orc(path + "/p=1")
df.write.orc(path + "/p=2")
assert(spark.read.orc(path).count() === 4)

val extraOptions = Map(
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
"mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
)
assert(spark.read.options(extraOptions).orc(path).count() === 2)
}
}
}

class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
test("read partitioned table - partition key included in orc file") {
withTempDir { base =>
for {
Expand All @@ -127,7 +189,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
i <- 1 to 10
pi <- Seq(1, 2)
ps <- Seq("foo", "bar")
} yield Row(i, pi, i.toString, ps))
} yield Row(i, i.toString, pi, ps))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we testing v1 or v2 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2.
For V1 we use OrcV1PartitionDiscoverySuite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should put V2 in the test suite name as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not quite related to this PR. If we are going to use V2 by default, I think the current test suite name is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't V1 by default now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For read path, it is V2 by default now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make a followup PR to put V2 in the test suite name and do not rely on the default config values.


checkAnswer(
sql("SELECT intField, pi FROM t"),
Expand All @@ -142,28 +204,26 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
for {
i <- 1 to 10
ps <- Seq("foo", "bar")
} yield Row(i, 1, i.toString, ps))
} yield Row(i, i.toString, 1, ps))

checkAnswer(
sql("SELECT * FROM t WHERE ps = 'foo'"),
for {
i <- 1 to 10
pi <- Seq(1, 2)
} yield Row(i, pi, i.toString, "foo"))
} yield Row(i, i.toString, pi, "foo"))
}
}
}


test("read partitioned table - with nulls") {
test("read partitioned table - with nulls and partition keys are included in Orc file") {
withTempDir { base =>
for {
// Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero...
pi <- Seq(1, null.asInstanceOf[Integer])
pi <- Seq(1, 2)
ps <- Seq("foo", null.asInstanceOf[String])
} {
makeOrcFile(
(1 to 10).map(i => OrcParData(i, i.toString)),
(1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

Expand All @@ -177,23 +237,71 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
sql("SELECT * FROM t"),
for {
i <- 1 to 10
pi <- Seq(1, null.asInstanceOf[Integer])
pi <- Seq(1, 2)
ps <- Seq("foo", null.asInstanceOf[String])
} yield Row(i, i.toString, pi, ps))

checkAnswer(
sql("SELECT * FROM t WHERE pi IS NULL"),
sql("SELECT * FROM t WHERE ps IS NULL"),
for {
i <- 1 to 10
ps <- Seq("foo", null.asInstanceOf[String])
} yield Row(i, i.toString, null, ps))
pi <- Seq(1, 2)
} yield Row(i, i.toString, pi, null))
}
}
}
}

class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")

test("read partitioned table - partition key included in orc file") {
withTempDir { base =>
for {
pi <- Seq(1, 2)
ps <- Seq("foo", "bar")
} {
makeOrcFile(
(1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)),
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
}

spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t")

withTempTable("t") {
checkAnswer(
sql("SELECT * FROM t WHERE ps IS NULL"),
sql("SELECT * FROM t"),
for {
i <- 1 to 10
pi <- Seq(1, null.asInstanceOf[Integer])
} yield Row(i, i.toString, pi, null))
pi <- Seq(1, 2)
ps <- Seq("foo", "bar")
} yield Row(i, pi, i.toString, ps))

checkAnswer(
sql("SELECT intField, pi FROM t"),
for {
i <- 1 to 10
pi <- Seq(1, 2)
_ <- Seq("foo", "bar")
} yield Row(i, pi))

checkAnswer(
sql("SELECT * FROM t WHERE pi = 1"),
for {
i <- 1 to 10
ps <- Seq("foo", "bar")
} yield Row(i, 1, i.toString, ps))

checkAnswer(
sql("SELECT * FROM t WHERE ps = 'foo'"),
for {
i <- 1 to 10
pi <- Seq(1, 2)
} yield Row(i, pi, i.toString, "foo"))
}
}
}
Expand Down Expand Up @@ -232,31 +340,4 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest {
}
}
}

test("SPARK-27162: handle pathfilter configuration correctly") {
withTempPath { dir =>
val path = dir.getCanonicalPath

val df = spark.range(2)
df.write.orc(path + "/p=1")
df.write.orc(path + "/p=2")
assert(spark.read.orc(path).count() === 4)

val extraOptions = Map(
"mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
"mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
)
assert(spark.read.options(extraOptions).orc(path).count() === 2)
}
}
}

class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext

class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
}