Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* The base interface for v2 data sources which don't have a real catalog. Implementations must
* have a public, 0-arg constructor.
* have a public, 0-arg constructor, and can only deal with existing tables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrase "deal with" is not very specific. I think it would be better to say what these tables can be used for: data operations like read, append, delete, and overwrite. Not operations that require metadata changes.

* <p>
* The major responsibility of this interface is to return a {@link Table} for read/write.
* </p>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ default WriteBuilder withInputDataSchema(StructType schema) {
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this write returns BATCH_WRITE support in its
* {@link Table#capabilities()}.
*
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
* to indicate that no writing is needed. We can clean it up after removing
* {@link SupportsSaveMode}.
*/
default BatchWrite buildForBatch() {
throw new UnsupportedOperationException(getClass().getName() +
Expand Down
41 changes: 21 additions & 20 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable,
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -267,6 +266,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match {
// TODO: for backward compatibility reasons, the builtin file source needs to support all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have an IDed TODO? TODO: -> TODO(SPARK-XXX):?

// the save modes, which violates the semantic of `TableProvider`. Here we special-case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. Here we special-case?

// file source and pass the save mode to file source directly. This hack can be removed
// after we figure out a general interface for path-based data sources.
case table: FileTable =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this hack necessary? Why not put off v2 support for path-based tables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we want to migrate file source to v2, to validate the API of Table, ScanBuilder, WriteBuilder, etc.

This hack is just to work around the issue that we do not have a proper entry API for path-based data source, which I believe we will have later on.

I think this is not a bad idea. We can unblock the file source migration, and we can keep the DS v2 clean (FileTable is an internal class). Besides that this hack will be removed once we have path-based API in DS v2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File sources do not use v2 by default, so this is not a necessary change for this commit. I think it should be in a separate PR. We can discuss whether it is a good idea to add this hack in the next DSv2 sync.

Please remove it from this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding this to my agenda for the sync-up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we agreed in the sync-up to remove this hack. Is there a reason why it is still included?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained in the comment: #24233 (comment)

Removing the hack breaks several tests and I'd like to do it in another PR. Since SupportsSaveMode is a hack, which means we already have a hack for file source v2 in DataFrameWriter.save. To keep the PR small, I think it's better to still keep the hack(it's not creating a new hack) and open a new PR to remove it and update the tests.

val write = table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
.mode(mode)
.withQueryId(UUID.randomUUID().toString)
.withInputDataSchema(df.logicalPlan.schema)
.buildForBatch()
// The returned `Write` can be null, which indicates that we can skip writing.
if (write != null) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(write, df.logicalPlan)
}
}

case table: SupportsWrite if table.supports(BATCH_WRITE) =>
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
mode match {
Expand All @@ -282,24 +298,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}

case _ =>
table.newWriteBuilder(dsOptions) match {
case writeBuilder: SupportsSaveMode =>
val write = writeBuilder.mode(mode)
.withQueryId(UUID.randomUUID().toString)
.withInputDataSchema(df.logicalPlan.schema)
.buildForBatch()
// It can only return null with `SupportsSaveMode`. We can clean it up after
// removing `SupportsSaveMode`.
if (write != null) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(write, df.logicalPlan)
}
}

case _ =>
throw new AnalysisException(
s"data source ${table.name} does not support SaveMode $mode")
}
throw new AnalysisException(s"TableProvider implementation $source cannot be " +
"written with ErrorIfExists or Ignore modes, please use Append or Overwrite " +
"modes instead.")
}

// Streaming also uses the data source V2 API. So it may be that the data source implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
Expand All @@ -46,9 +45,7 @@ private[noop] object NoopTable extends Table with SupportsWrite with SupportsStr
override def capabilities(): util.Set[TableCapability] = Set(TableCapability.BATCH_WRITE).asJava
}

private[noop] object NoopWriteBuilder extends WriteBuilder
with SupportsSaveMode with SupportsTruncate {
override def mode(mode: SaveMode): WriteBuilder = this
private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now DSV2(except file sources) can only write to an existing table, here the write path of NoopDataSource will still fail (analyzer rule ResolveOutputRelation)

override def truncate(): WriteBuilder = this
override def buildForBatch(): BatchWrite = NoopBatchWrite
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, WriteBuilder}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaUtils
Expand All @@ -43,8 +43,7 @@ abstract class FileWriteBuilder(
options: CaseInsensitiveStringMap,
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean)
extends WriteBuilder with SupportsSaveMode {
supportsDataType: DataType => Boolean) extends WriteBuilder {
private var schema: StructType = _
private var queryId: String = _
private var mode: SaveMode = _
Expand All @@ -59,7 +58,7 @@ abstract class FileWriteBuilder(
this
}

override def mode(mode: SaveMode): WriteBuilder = {
def mode(mode: SaveMode): WriteBuilder = {
this.mode = mode
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import org.apache.spark.sql.sources.v2.writer._

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree with this.

Wildcard imports make it difficult to cherry-pick commits and increase conflicts. It is also difficult to see where symbols are coming from and pollutes the namespace with everything in a package instead of just the required names.

For example, I recently hit problems adding a logical package for expressions because of places that imported expressions._ along with plans._.

import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{LongAccumulator, Utils}

Expand All @@ -58,13 +57,7 @@ case class AppendDataExec(
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {

override protected def doExecute(): RDD[InternalRow] = {
val batchWrite = newWriteBuilder() match {
case builder: SupportsSaveMode =>
builder.mode(SaveMode.Append).buildForBatch()

case builder =>
builder.buildForBatch()
}
val batchWrite = newWriteBuilder().buildForBatch()
doWrite(batchWrite)
}
}
Expand Down Expand Up @@ -94,9 +87,6 @@ case class OverwriteByExpressionExec(
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
builder.truncate().buildForBatch()

case builder: SupportsSaveMode if isTruncate(deleteWhere) =>
builder.mode(SaveMode.Overwrite).buildForBatch()

case builder: SupportsOverwrite =>
builder.overwrite(deleteWhere).buildForBatch()

Expand Down Expand Up @@ -127,9 +117,6 @@ case class OverwritePartitionsDynamicExec(
case builder: SupportsDynamicOverwrite =>
builder.overwriteDynamicPartitions().buildForBatch()

case builder: SupportsSaveMode =>
builder.mode(SaveMode.Overwrite).buildForBatch()

case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
}
Expand Down Expand Up @@ -292,8 +279,8 @@ object DataWritingSparkTask extends Logging {
}

private[v2] case class DataWritingSparkTaskResult(
numRows: Long,
writerCommitMessage: WriterCommitMessage)
numRows: Long,
writerCommitMessage: WriterCommitMessage)

/**
* Sink progress information collected after commit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import test.org.apache.spark.sql.sources.v2._

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
Expand Down Expand Up @@ -218,36 +218,30 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
val path = file.getCanonicalPath
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)

spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).select('id, -'id))

// test with different save modes
spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("append").save()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is needed because the default mode for v2 is append.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before my PR, we write the files twice: once without the save mode, once with append mode.

Now I switch order, to make sure that the second write doesn't specify save mode, and prove the default mode is append.

checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).union(spark.range(10)).select('id, -'id))
spark.range(10).select('id, -'id))

spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("overwrite").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(5).select('id, -'id))

spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("ignore").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(5).select('id, -'id))
val e = intercept[AnalysisException] {
spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("ignore").save()
}
assert(e.message.contains("please use Append or Overwrite modes instead"))

val e = intercept[Exception] {
val e2 = intercept[AnalysisException] {
spark.range(5).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).mode("error").save()
}
assert(e.getMessage.contains("data already exists"))
assert(e2.getMessage.contains("please use Append or Overwrite modes instead"))

// test transaction
val failingUdf = org.apache.spark.sql.functions.udf {
Expand All @@ -262,10 +256,10 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
}
// this input data will fail to read middle way.
val input = spark.range(10).select(failingUdf('id).as('i)).select('i, -'i as 'j)
val e2 = intercept[SparkException] {
val e3 = intercept[SparkException] {
input.write.format(cls.getName).option("path", path).mode("overwrite").save()
}
assert(e2.getMessage.contains("Writing job aborted"))
assert(e3.getMessage.contains("Writing job aborted"))
// make sure we don't have partial data.
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
}
Expand All @@ -280,7 +274,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {

val numPartition = 6
spark.range(0, 10, 1, numPartition).select('id as 'i, -'id as 'j).write.format(cls.getName)
.option("path", path).save()
.option("path", path).mode("append").save()
checkAnswer(
spark.read.format(cls.getName).option("path", path).load(),
spark.range(10).select('id, -'id))
Expand Down Expand Up @@ -367,31 +361,13 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
val format = classOf[SimpleWritableDataSource].getName

val df = Seq((1L, 2L)).toDF("i", "j")
df.write.format(format).option("path", optionPath).save()
df.write.format(format).option("path", optionPath).mode("append").save()
assert(!new File(sessionPath).exists)
checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
}
}
}
}

test("SPARK-25700: do not read schema when writing in other modes except append and overwrite") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is not needed anymore, as ErrorIfExists and Ignore modes are unsupported now.

withTempPath { file =>
val cls = classOf[SimpleWriteOnlyDataSource]
val path = file.getCanonicalPath
val df = spark.range(5).select('id as 'i, -'id as 'j)
// non-append mode should not throw exception, as they don't access schema.
df.write.format(cls.getName).option("path", path).mode("error").save()
df.write.format(cls.getName).option("path", path).mode("ignore").save()
// append and overwrite modes will access the schema and should throw exception.
intercept[SchemaReadAttemptException] {
df.write.format(cls.getName).option("path", path).mode("append").save()
}
intercept[SchemaReadAttemptException] {
df.write.format(cls.getName).option("path", path).mode("overwrite").save()
}
}
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.reader._
Expand Down Expand Up @@ -70,38 +69,26 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport {
override def readSchema(): StructType = tableSchema
}

class MyWriteBuilder(path: String) extends WriteBuilder with SupportsSaveMode {
class MyWriteBuilder(path: String) extends WriteBuilder with SupportsTruncate {
private var queryId: String = _
private var mode: SaveMode = _
private var needTruncate = false

override def withQueryId(queryId: String): WriteBuilder = {
this.queryId = queryId
this
}

override def mode(mode: SaveMode): WriteBuilder = {
this.mode = mode
override def truncate(): WriteBuilder = {
this.needTruncate = true
this
}

override def buildForBatch(): BatchWrite = {
assert(mode != null)

val hadoopPath = new Path(path)
val hadoopConf = SparkContext.getActive.get.hadoopConfiguration
val fs = hadoopPath.getFileSystem(hadoopConf)

if (mode == SaveMode.ErrorIfExists) {
if (fs.exists(hadoopPath)) {
throw new RuntimeException("data already exists.")
}
}
if (mode == SaveMode.Ignore) {
if (fs.exists(hadoopPath)) {
return null
}
}
if (mode == SaveMode.Overwrite) {
if (needTruncate) {
fs.delete(hadoopPath, true)
}

Expand Down