Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import java.io.IOException
import java.util.UUID

import scala.collection.JavaConverters._
Expand All @@ -27,7 +26,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
Expand All @@ -46,12 +45,6 @@ abstract class FileWriteBuilder(
private val schema = info.schema()
private val queryId = info.queryId()
private val options = info.options()
private var mode: SaveMode = _

def mode(mode: SaveMode): WriteBuilder = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn . Technically, removal of dead code is not a bug fix. Please re-category this as an Improvement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks

this.mode = mode
this
}

override def buildForBatch(): BatchWrite = {
val sparkSession = SparkSession.active
Expand All @@ -68,26 +61,8 @@ abstract class FileWriteBuilder(
lazy val description =
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)

val fs = path.getFileSystem(hadoopConf)
mode match {
case SaveMode.ErrorIfExists if fs.exists(path) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give me the pointer where handles this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

case (SaveMode.Append, Some(table)) =>

val qualifiedOutputPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")

case SaveMode.Ignore if fs.exists(path) =>
null

case SaveMode.Overwrite =>
if (fs.exists(path) && !committer.deleteWithJob(fs, path, true)) {
throw new IOException(s"Unable to clear directory $path prior to writing to it")
}
committer.setupJob(job)
new FileBatchWrite(job, description, committer)

case _ =>
committer.setupJob(job)
new FileBatchWrite(job, description, committer)
}
committer.setupJob(job)
new FileBatchWrite(job, description, committer)
}

/**
Expand All @@ -104,7 +79,6 @@ abstract class FileWriteBuilder(
private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = {
assert(schema != null, "Missing input data schema")
assert(queryId != null, "Missing query ID")
assert(mode != null, "Missing save mode")

if (paths.length != 1) {
throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
Expand Down