diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala index 7c72495548e3..d5e89beaf42f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala @@ -40,7 +40,7 @@ class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPl val relation = HadoopFsRelation( table.fileIndex, table.fileIndex.partitionSchema, - table.schema(), + table.schema, None, v1FileFormat, d.options.asScala.toMap)(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index bdd6a48df20c..6ab5c4b269b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -23,11 +23,13 @@ import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, Scan} import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class FileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, - readSchema: StructType) extends Scan with Batch { + readSchema: StructType, + options: CaseInsensitiveStringMap) extends Scan with Batch { /** * Returns whether a file with `path` could be split or not. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index 5dd343ba44b6..d4e55a50307d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -21,8 +21,7 @@ import org.apache.spark.sql.types.StructType abstract class FileScanBuilder(schema: StructType) extends ScanBuilder - with SupportsPushDownRequiredColumns - with SupportsPushDownFilters { + with SupportsPushDownRequiredColumns { protected var readSchema = schema override def pruneColumns(requiredSchema: StructType): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 08873a3b5a64..9545a5fbd2e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.SchemaUtils abstract class FileTable( sparkSession: SparkSession, @@ -52,10 +53,15 @@ abstract class FileTable( s"Unable to infer schema for $name. It must be specified manually.") }.asNullable - override def schema(): StructType = { + override lazy val schema: StructType = { val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + SchemaUtils.checkColumnNameDuplication(dataSchema.fieldNames, + "in the data schema", caseSensitive) + val partitionSchema = fileIndex.partitionSchema + SchemaUtils.checkColumnNameDuplication(partitionSchema.fieldNames, + "in the partition schema", caseSensitive) PartitioningUtils.mergeDataAndPartitionSchema(dataSchema, - fileIndex.partitionSchema, caseSensitive)._1 + partitionSchema, caseSensitive)._1 } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index e16ee4c460f3..0d07f5a02c25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder} import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.util.SerializableConfiguration abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) @@ -60,10 +61,11 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St } override def buildForBatch(): BatchWrite = { - validateInputs() - val path = new Path(paths.head) val sparkSession = SparkSession.active + validateInputs(sparkSession.sessionState.conf.caseSensitiveAnalysis) + val path = new Path(paths.head) val optionsAsScala = options.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala) val job = getJobInstance(hadoopConf, path) val committer = FileCommitProtocol.instantiate( @@ -122,12 +124,20 @@ abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[St */ def formatName: String - private def validateInputs(): Unit = { + private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = { assert(schema != null, "Missing input data schema") assert(queryId != null, "Missing query ID") assert(mode != null, "Missing save mode") - assert(paths.length == 1) + + if (paths.length != 1) { + throw new IllegalArgumentException("Expected exactly one path to be specified, but " + + s"got: ${paths.mkString(", ")}") + } + val pathName = paths.head + SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name), + s"when inserting into $pathName", caseSensitiveAnalysis) DataSource.validateSchema(schema) + schema.foreach { field => if (!supportsDataType(field.dataType)) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala index 3c5dc1f50d7e..237eadb698b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.v2.FileScan import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration case class OrcScan( @@ -31,7 +32,9 @@ case class OrcScan( hadoopConf: Configuration, fileIndex: PartitioningAwareFileIndex, dataSchema: StructType, - readSchema: StructType) extends FileScan(sparkSession, fileIndex, readSchema) { + readSchema: StructType, + options: CaseInsensitiveStringMap) + extends FileScan(sparkSession, fileIndex, readSchema, options) { override def isSplitable(path: Path): Boolean = true override def createReaderFactory(): PartitionReaderFactory = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index 0b153416b7bb..4767f21bb029 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.orc.OrcFilters import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.reader.Scan +import org.apache.spark.sql.sources.v2.reader.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -35,11 +35,12 @@ case class OrcScanBuilder( fileIndex: PartitioningAwareFileIndex, schema: StructType, dataSchema: StructType, - options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) { + options: CaseInsensitiveStringMap) + extends FileScanBuilder(schema) with SupportsPushDownFilters { lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap) override def build(): Scan = { - OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema) + OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema, options) } private var _pushedFilters: Array[Filter] = Array.empty diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e45ab19aadbf..9f969473da61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -838,6 +838,12 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkReadUserSpecifiedDataColumnDuplication( Seq((1, 1)).toDF("c0", "c1"), "parquet", c0, c1, src) checkReadPartitionColumnDuplication("parquet", c0, c1, src) + + // Check ORC format + checkWriteDataColumnDuplication("orc", c0, c1, src) + checkReadUserSpecifiedDataColumnDuplication( + Seq((1, 1)).toDF("c0", "c1"), "orc", c0, c1, src) + checkReadPartitionColumnDuplication("orc", c0, c1, src) } } }