diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index d7284fd05ad3d..cb816d69b68c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -70,8 +70,16 @@ abstract class FileTable( val partitionSchema = fileIndex.partitionSchema SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames, "in the partition schema", caseSensitive) - PartitioningUtils.mergeDataAndPartitionSchema(dataSchema, - partitionSchema, caseSensitive)._1 + val partitionNameSet: Set[String] = + partitionSchema.fields.map(PartitioningUtils.getColName(_, caseSensitive)).toSet + + // When data and partition schemas have overlapping columns, + // tableSchema = dataSchema - overlapSchema + partitionSchema + val fields = dataSchema.fields.filterNot { field => + val colName = PartitioningUtils.getColName(field, caseSensitive) + partitionNameSet.contains(colName) + } ++ partitionSchema.fields + StructType(fields) } override def capabilities(): java.util.Set[TableCapability] = FileTable.CAPABILITIES diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 1d30cbfbaf1a5..add8a306a9e08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -493,6 +493,20 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } } + + test("Return correct results when data columns overlap with partition columns") { + Seq("parquet", "orc", "json").foreach { format => + withTempPath { path => + val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e") + Seq((1, 2, 3, 4, 5)).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5") + .write.format(format).save(tablePath.getCanonicalPath) + + val df = spark.read.format(format).load(path.getCanonicalPath) + .select("CoL1", "Col2", "CoL5", "CoL3") + checkAnswer(df, Row("a", 2, "e", "c")) + } + } + } } object TestingUDT { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala index e1d02540a7454..143e3f0997201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcPartitionDiscoverySuite.scala @@ -107,6 +107,68 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } } + test("read partitioned table - with nulls") { + withTempDir { base => + for { + // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero... + pi <- Seq(1, null.asInstanceOf[Integer]) + ps <- Seq("foo", null.asInstanceOf[String]) + } { + makeOrcFile( + (1 to 10).map(i => OrcParData(i, i.toString)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + spark.read + .option("hive.exec.default.partition.name", defaultPartitionName) + .orc(base.getCanonicalPath) + .createOrReplaceTempView("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, null.asInstanceOf[Integer]) + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, i.toString, pi, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi IS NULL"), + for { + i <- 1 to 10 + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, i.toString, null, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps IS NULL"), + for { + i <- 1 to 10 + pi <- Seq(1, null.asInstanceOf[Integer]) + } yield Row(i, i.toString, pi, null)) + } + } + } + + test("SPARK-27162: handle pathfilter configuration correctly") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = spark.range(2) + df.write.orc(path + "/p=1") + df.write.orc(path + "/p=2") + assert(spark.read.orc(path).count() === 4) + + val extraOptions = Map( + "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, + "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName + ) + assert(spark.read.options(extraOptions).orc(path).count() === 2) + } + } +} + +class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext { test("read partitioned table - partition key included in orc file") { withTempDir { base => for { @@ -127,7 +189,7 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { i <- 1 to 10 pi <- Seq(1, 2) ps <- Seq("foo", "bar") - } yield Row(i, pi, i.toString, ps)) + } yield Row(i, i.toString, pi, ps)) checkAnswer( sql("SELECT intField, pi FROM t"), @@ -142,28 +204,26 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { for { i <- 1 to 10 ps <- Seq("foo", "bar") - } yield Row(i, 1, i.toString, ps)) + } yield Row(i, i.toString, 1, ps)) checkAnswer( sql("SELECT * FROM t WHERE ps = 'foo'"), for { i <- 1 to 10 pi <- Seq(1, 2) - } yield Row(i, pi, i.toString, "foo")) + } yield Row(i, i.toString, pi, "foo")) } } } - - test("read partitioned table - with nulls") { + test("read partitioned table - with nulls and partition keys are included in Orc file") { withTempDir { base => for { - // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero... - pi <- Seq(1, null.asInstanceOf[Integer]) + pi <- Seq(1, 2) ps <- Seq("foo", null.asInstanceOf[String]) } { makeOrcFile( - (1 to 10).map(i => OrcParData(i, i.toString)), + (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)), makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) } @@ -177,23 +237,71 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { sql("SELECT * FROM t"), for { i <- 1 to 10 - pi <- Seq(1, null.asInstanceOf[Integer]) + pi <- Seq(1, 2) ps <- Seq("foo", null.asInstanceOf[String]) } yield Row(i, i.toString, pi, ps)) checkAnswer( - sql("SELECT * FROM t WHERE pi IS NULL"), + sql("SELECT * FROM t WHERE ps IS NULL"), for { i <- 1 to 10 - ps <- Seq("foo", null.asInstanceOf[String]) - } yield Row(i, i.toString, null, ps)) + pi <- Seq(1, 2) + } yield Row(i, i.toString, pi, null)) + } + } + } +} +class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") + .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc") + + test("read partitioned table - partition key included in orc file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } { + makeOrcFile( + (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + spark.read.orc(base.getCanonicalPath).createOrReplaceTempView("t") + + withTempTable("t") { checkAnswer( - sql("SELECT * FROM t WHERE ps IS NULL"), + sql("SELECT * FROM t"), for { i <- 1 to 10 - pi <- Seq(1, null.asInstanceOf[Integer]) - } yield Row(i, i.toString, pi, null)) + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } yield Row(i, pi, i.toString, ps)) + + checkAnswer( + sql("SELECT intField, pi FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + _ <- Seq("foo", "bar") + } yield Row(i, pi)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi = 1"), + for { + i <- 1 to 10 + ps <- Seq("foo", "bar") + } yield Row(i, 1, i.toString, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps = 'foo'"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, pi, i.toString, "foo")) } } } @@ -232,31 +340,4 @@ abstract class OrcPartitionDiscoveryTest extends OrcTest { } } } - - test("SPARK-27162: handle pathfilter configuration correctly") { - withTempPath { dir => - val path = dir.getCanonicalPath - - val df = spark.range(2) - df.write.orc(path + "/p=1") - df.write.orc(path + "/p=2") - assert(spark.read.orc(path).count() === 4) - - val extraOptions = Map( - "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName, - "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName - ) - assert(spark.read.options(extraOptions).orc(path).count() === 2) - } - } -} - -class OrcPartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext - -class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with SharedSQLContext { - override protected def sparkConf: SparkConf = - super - .sparkConf - .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") - .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc") }