Skip to content

Commit d6521ea

Browse files
rxinyhuai
authored andcommitted
[SPARK-18917][SC-5624] Remove schema check in appending data
In append mode, we check whether the schema of the write is compatible with the schema of the existing data. It can be a significant performance issue in cloud environment to find the existing schema for files. This patch removes the check. Note that for catalog tables, we always do the check, as discussed in apache#16339 (comment) backport apache#16622 to our internal 2.1 branch. Author: Reynold Xin <[email protected]> Closes apache#178 from cloud-fan/backport.
1 parent 0eb17f8 commit d6521ea

File tree

1 file changed

+3
-33
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/datasources

1 file changed

+3
-33
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -104,20 +104,12 @@ case class DataSource(
104104
* dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
105105
* this information, therefore calls to this method should be very cheap, i.e. there won't
106106
* be any further inference in any triggers.
107-
* 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the
108-
* existing table's partitioning scheme. This is achieved by not providing
109-
* `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early
110-
* exit, if we don't care about the schema of the original table.
111107
*
112108
* @param format the file format object for this DataSource
113-
* @param justPartitioning Whether to exit early and provide just the schema partitioning.
114109
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
115-
* columns. If `justPartitioning` is `true`, then the dataSchema will be provided as
116-
* `null`.
110+
* columns.
117111
*/
118-
private def getOrInferFileFormatSchema(
119-
format: FileFormat,
120-
justPartitioning: Boolean = false): (StructType, StructType) = {
112+
private def getOrInferFileFormatSchema(format: FileFormat): (StructType, StructType) = {
121113
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
122114
// in streaming mode, we have already inferred and registered partition columns, we will
123115
// never have to materialize the lazy val below
@@ -174,9 +166,7 @@ case class DataSource(
174166
StructType(partitionFields)
175167
}
176168
}
177-
if (justPartitioning) {
178-
return (null, partitionSchema)
179-
}
169+
180170
val dataSchema = userSpecifiedSchema.map { schema =>
181171
val equality = sparkSession.sessionState.conf.resolver
182172
StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
@@ -435,26 +425,6 @@ case class DataSource(
435425
PartitioningUtils.validatePartitionColumn(
436426
data.schema, partitionColumns, caseSensitive)
437427

438-
// If we are appending to a table that already exists, make sure the partitioning matches
439-
// up. If we fail to load the table for whatever reason, ignore the check.
440-
if (mode == SaveMode.Append) {
441-
val existingPartitionColumns = Try {
442-
getOrInferFileFormatSchema(format, justPartitioning = true)._2.fieldNames.toList
443-
}.getOrElse(Seq.empty[String])
444-
// TODO: Case sensitivity.
445-
val sameColumns =
446-
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
447-
if (existingPartitionColumns.nonEmpty && !sameColumns) {
448-
throw new AnalysisException(
449-
s"""Requested partitioning does not match existing partitioning.
450-
|Existing partitioning columns:
451-
| ${existingPartitionColumns.mkString(", ")}
452-
|Requested partitioning columns:
453-
| ${partitionColumns.mkString(", ")}
454-
|""".stripMargin)
455-
}
456-
}
457-
458428
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
459429
// not need to have the query as child, to avoid to analyze an optimized query,
460430
// because InsertIntoHadoopFsRelationCommand will be optimized first.

0 commit comments

Comments
 (0)