From e320a1ece922e82dcb79d5e76be0001696fb7356 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 3 Dec 2018 22:38:43 +0800 Subject: [PATCH 1/7] data source v2 API refactor (batch write) --- .../sources/v2/BatchWriteSupportProvider.java | 59 ------------- .../spark/sql/sources/v2/DataSourceV2.java | 10 +-- .../sql/sources/v2/SupportsBatchWrite.java | 34 +++++++ .../spark/sql/sources/v2/SupportsRead.java | 5 +- .../spark/sql/sources/v2/SupportsWrite.java | 40 +++++++++ ...BatchWriteSupport.java => BatchWrite.java} | 2 +- .../sql/sources/v2/writer/DataWriter.java | 12 +-- .../sources/v2/writer/DataWriterFactory.java | 2 +- .../sql/sources/v2/writer/WriteBuilder.java | 30 +++++++ .../v2/writer/WriterCommitMessage.java | 2 +- .../apache/spark/sql/DataFrameReader.scala | 8 +- .../apache/spark/sql/DataFrameWriter.scala | 36 ++++---- .../datasources/v2/DataSourceV2Relation.scala | 88 +++++-------------- .../datasources/v2/DataSourceV2Strategy.scala | 7 +- .../v2/WriteToDataSourceV2Exec.scala | 26 +++--- .../streaming/MicroBatchExecution.scala | 4 +- ...ritSupport.scala => MicroBatchWrite.scala} | 7 +- .../sources/PackedRowWriterFactory.scala | 4 +- .../sql/sources/v2/DataSourceV2Suite.scala | 33 +------ .../sources/v2/SimpleWritableDataSource.scala | 76 ++++++++-------- 20 files changed, 221 insertions(+), 264 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java rename sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/{BatchWriteSupport.java => BatchWrite.java} (99%) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java rename sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/{MicroBatchWritSupport.scala => MicroBatchWrite.scala} (84%) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java deleted file mode 100644 index df439e2c02fe..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for batch processing. - * - * This interface is used to create {@link BatchWriteSupport} instances when end users run - * {@code Dataset.write.format(...).option(...).save()}. - */ -@Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { - - /** - * Creates an optional {@link BatchWriteSupport} instance to save the data to this data source, - * which is called by Spark at the beginning of each batch query. - * - * Data sources can return None if there is no writing needed to be done according to the save - * mode. - * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, and the returned - * {@link BatchWriteSupport} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the save mode which determines what to do when the data are already in this data - * source, please refer to {@link SaveMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - * case-insensitive string-to-string map. - * @return a write support to write data to this data source. - */ - Optional createBatchWriteSupport( - String queryId, - StructType schema, - SaveMode mode, - DataSourceOptions options); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index eae7a45d1d44..4aaa57dd4db9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -20,15 +20,7 @@ import org.apache.spark.annotation.Evolving; /** - * The base interface for data source v2. Implementations must have a public, 0-arg constructor. - * - * Note that this is an empty interface. Data source implementations must mix in interfaces such as - * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide - * batch or streaming read/write support instances. Otherwise it's just a dummy data source which - * is un-readable/writable. - * - * If Spark fails to execute any methods in the implementations of this interface (by throwing an - * exception), the read action will fail and no Spark job will be submitted. + * TODO: remove it when we finish the API refactor for streaming side. */ @Evolving public interface DataSourceV2 {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java new file mode 100644 index 000000000000..d0204413b4e5 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. + *

+ * If a {@link Table} implements this interface, the + * {@link SupportsWrite#newWriteBuilder(StructType, DataSourceOptions)} must return a + * {@link WriteBuilder} with {@link WriteBuilder#buildWithSaveMode(SaveMode)} implemented. + *

+ */ +@Evolving +public interface SupportsBatchWrite extends SupportsWrite {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java index e22738d20d50..5a0adf0d74e9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java @@ -29,7 +29,10 @@ interface SupportsRead extends Table { /** * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this - * method to configure each scan. + * method to configure each data source scan. + * + * @param options The options for writing, which is an immutable case-insensitive + * string-to-string map. */ ScanBuilder newScanBuilder(DataSourceOptions options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java new file mode 100644 index 000000000000..18e933a1b355 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.writer.BatchWrite; +import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.types.StructType; + +/** + * An internal base interface of mix-in interfaces for writable {@link Table}. This adds + * {@link #newWriteBuilder(StructType, DataSourceOptions)} that is used to create a write for batch + * or streaming. + */ +interface SupportsWrite extends Table { + + /** + * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call + * this method to configure each data source write. + * + * @param schema The schema of the data to write. + * @param options The options for writing, which is an immutable case-insensitive + * string-to-string map. + */ + WriteBuilder newWriteBuilder(StructType schema, DataSourceOptions options); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java similarity index 99% rename from sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java rename to sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java index efe1ac4f78db..91297759971b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWrite.java @@ -38,7 +38,7 @@ * Please refer to the documentation of commit/abort methods for detailed specifications. */ @Evolving -public interface BatchWriteSupport { +public interface BatchWrite { /** * Creates a writer factory which will be serialized and sent to executors. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index d142ee523ef9..11228ad1ea67 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -36,11 +36,11 @@ * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data + * {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an * exception will be sent to the driver side, and Spark may retry this writing task a few times. * In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a - * different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])} + * different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])} * when the configured number of retries is exhausted. * * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task @@ -71,11 +71,11 @@ public interface DataWriter { /** * Commits this writer after all records are written successfully, returns a commit message which * will be sent back to driver side and passed to - * {@link BatchWriteSupport#commit(WriterCommitMessage[])}. + * {@link BatchWrite#commit(WriterCommitMessage[])}. * * The written data should only be visible to data source readers after - * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method - * should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to + * {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method + * should still "hide" the written data and ask the {@link BatchWrite} at driver side to * do the final commit via {@link WriterCommitMessage}. * * If this method fails (by throwing an exception), {@link #abort()} will be called and this @@ -93,7 +93,7 @@ public interface DataWriter { * failed. * * If this method fails(by throwing an exception), the underlying data source may have garbage - * that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually, + * that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually, * but these garbage should not be visible to data source readers. * * @throws IOException if failure happens during disk/network IO like writing files. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java index 65105f46b82d..bf2db9059b08 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.InternalRow; /** - * A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()}, + * A factory of {@link DataWriter} returned by {@link BatchWrite#createBatchWriterFactory()}, * which is responsible for creating and initializing the actual data writer at executor side. * * Note that, the writer factory will be serialized and sent to executors, then the data writer diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java new file mode 100644 index 000000000000..1bd8a27251f0 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import java.util.Optional; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.SaveMode; + +@Evolving +public interface WriteBuilder { + + // TODO: remove it after we finish all the new write operators. + Optional buildWithSaveMode(SaveMode mode); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java index 9216e3439909..6334c8f64309 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java @@ -24,7 +24,7 @@ /** * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side - * as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or + * as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}. * * This is an empty interface, data sources should define their own message class and use it when diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index ce8e4c8f5b82..192c045c0805 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.sources.v2._ -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String /** @@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { case _ => provider.getTable(dsOptions) } table match { - case s: SupportsBatchRead => - Dataset.ofRows(sparkSession, DataSourceV2Relation.create( - provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema)) - + case readableTable: SupportsBatchRead => + Dataset.ofRows(sparkSession, DataSourceV2Relation.create(readableTable, dsOptions)) case _ => loadV1Source(paths: _*) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 981b3a8fd4ac..eb6b10b2a01c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.util.{Locale, Properties, UUID} +import java.util.{Locale, Properties} import scala.collection.JavaConverters._ @@ -241,32 +241,26 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotBucketed("save") - val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) - if (classOf[DataSourceV2].isAssignableFrom(cls)) { - val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2] - source match { - case provider: BatchWriteSupportProvider => - val sessionOptions = DataSourceV2Utils.extractSessionConfigs( - source, - df.sparkSession.sessionState.conf) - val options = sessionOptions ++ extraOptions - + val session = df.sparkSession + val cls = DataSource.lookupDataSource(source, session.sessionState.conf) + if (classOf[TableProvider].isAssignableFrom(cls)) { + val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] + val sessionOptions = DataSourceV2Utils.extractSessionConfigs( + provider, session.sessionState.conf) + val options = sessionOptions ++ extraOptions + val dsOptions = new DataSourceOptions(options.asJava) + provider.getTable(dsOptions) match { + case table: SupportsBatchWrite => + val relation = DataSourceV2Relation.create(table, dsOptions) if (mode == SaveMode.Append) { - val relation = DataSourceV2Relation.createRelationForWrite(source, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } - } else { - val writer = provider.createBatchWriteSupport( - UUID.randomUUID().toString, - df.logicalPlan.output.toStructType, - mode, - new DataSourceOptions(options.asJava)) - - if (writer.isPresent) { + val write = relation.newWriteBuilder(df.logicalPlan.schema).buildWithSaveMode(mode) + if (write.isPresent) { runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get, df.logicalPlan) + WriteToDataSourceV2(write.get, df.logicalPlan) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 7bf2b8bff373..0b6fdcc71f65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,50 +17,46 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport +import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types.StructType /** - * A logical plan representing a data source v2 scan. + * A logical plan representing a data source v2 table. * - * @param source An instance of a [[DataSourceV2]] implementation. - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]]. - * @param userSpecifiedSchema The user-specified schema for this scan. + * @param table The table that this relation represents. + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]] + * and [[WriteBuilder]]. */ case class DataSourceV2Relation( - // TODO: remove `source` when we finish API refactor for write. - source: TableProvider, - table: SupportsBatchRead, + table: Table, output: Seq[AttributeReference], - options: Map[String, String], - userSpecifiedSchema: Option[StructType] = None) + // TODO: use a simple case insensitive map instead. + options: DataSourceOptions) extends LeafNode with MultiInstanceRelation with NamedRelation { - import DataSourceV2Relation._ - override def name: String = table.name() override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } - def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) + def newScanBuilder(): ScanBuilder = table match { + case s: SupportsBatchRead => s.newScanBuilder(options) + case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}") + } + - def newScanBuilder(): ScanBuilder = { - val dsOptions = new DataSourceOptions(options.asJava) - table.newScanBuilder(dsOptions) + + def newWriteBuilder(schema: StructType): WriteBuilder = table match { + case s: SupportsBatchWrite => s.newWriteBuilder(schema, options) + case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}") } override def computeStats(): Statistics = { @@ -126,52 +122,8 @@ case class StreamingDataSourceV2Relation( } object DataSourceV2Relation { - private implicit class SourceHelpers(source: DataSourceV2) { - def asWriteSupportProvider: BatchWriteSupportProvider = { - source match { - case provider: BatchWriteSupportProvider => - provider - case _ => - throw new AnalysisException(s"Data source is not writable: $name") - } - } - - def name: String = { - source match { - case registered: DataSourceRegister => - registered.shortName() - case _ => - source.getClass.getSimpleName - } - } - - def createWriteSupport( - options: Map[String, String], - schema: StructType): BatchWriteSupport = { - asWriteSupportProvider.createBatchWriteSupport( - UUID.randomUUID().toString, - schema, - SaveMode.Append, - new DataSourceOptions(options.asJava)).get - } - } - - def create( - provider: TableProvider, - table: SupportsBatchRead, - options: Map[String, String], - userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { + def create(table: Table, options: DataSourceOptions): DataSourceV2Relation = { val output = table.schema().toAttributes - DataSourceV2Relation(provider, table, output, options, userSpecifiedSchema) - } - - // TODO: remove this when we finish API refactor for write. - def createRelationForWrite( - source: DataSourceV2, - options: Map[String, String]): DataSourceV2Relation = { - val provider = source.asInstanceOf[TableProvider] - val dsOptions = new DataSourceOptions(options.asJava) - val table = provider.getTable(dsOptions) - create(provider, table.asInstanceOf[SupportsBatchRead], options) + DataSourceV2Relation(table, output, options) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 2e26fce880b6..90d66dabd32f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.{sources, Strategy} +import org.apache.spark.sql.{sources, SaveMode, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition} @@ -110,7 +110,7 @@ object DataSourceV2Strategy extends Strategy { val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters) logInfo( s""" - |Pushing operators to ${relation.source.getClass} + |Pushing operators to ${relation.name} |Pushed Filters: ${pushedFilters.mkString(", ")} |Post-Scan Filters: ${postScanFilters.mkString(",")} |Output: ${output.mkString(", ")} @@ -136,7 +136,8 @@ object DataSourceV2Strategy extends Strategy { WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil + val write = r.newWriteBuilder(query.schema).buildWithSaveMode(SaveMode.Append).get() + WriteToDataSourceV2Exec(write, planLater(query)) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index d7e20eed4cbc..406fb8c3a383 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -35,7 +35,7 @@ import org.apache.spark.util.{LongAccumulator, Utils} * specific logical plans, like [[org.apache.spark.sql.catalyst.plans.logical.AppendData]]. */ @deprecated("Use specific logical plans like AppendData instead", "2.4.0") -case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPlan) +case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(query) override def output: Seq[Attribute] = Nil @@ -44,7 +44,7 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl /** * The physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) +case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan) extends UnaryExecNode { var commitProgress: Option[StreamWriterCommitProgress] = None @@ -53,13 +53,13 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { - val writerFactory = writeSupport.createBatchWriterFactory() - val useCommitCoordinator = writeSupport.useCommitCoordinator + val writerFactory = batchWrite.createBatchWriterFactory() + val useCommitCoordinator = batchWrite.useCommitCoordinator val rdd = query.execute() val messages = new Array[WriterCommitMessage](rdd.partitions.length) val totalNumRowsAccumulator = new LongAccumulator() - logInfo(s"Start processing data source write support: $writeSupport. " + + logInfo(s"Start processing data source write support: $batchWrite. " + s"The input RDD has ${messages.length} partitions.") try { @@ -72,26 +72,26 @@ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: Spark val commitMessage = result.writerCommitMessage messages(index) = commitMessage totalNumRowsAccumulator.add(result.numRows) - writeSupport.onDataWriterCommit(commitMessage) + batchWrite.onDataWriterCommit(commitMessage) } ) - logInfo(s"Data source write support $writeSupport is committing.") - writeSupport.commit(messages) - logInfo(s"Data source write support $writeSupport committed.") + logInfo(s"Data source write support $batchWrite is committing.") + batchWrite.commit(messages) + logInfo(s"Data source write support $batchWrite committed.") commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { case cause: Throwable => - logError(s"Data source write support $writeSupport is aborting.") + logError(s"Data source write support $batchWrite is aborting.") try { - writeSupport.abort(messages) + batchWrite.abort(messages) } catch { case t: Throwable => - logError(s"Data source write support $writeSupport failed to abort.") + logError(s"Data source write support $batchWrite failed to abort.") cause.addSuppressed(t) throw new SparkException("Writing job failed.", cause) } - logError(s"Data source write support $writeSupport aborted.") + logError(s"Data source write support $batchWrite aborted.") cause match { // Only wrap non fatal exceptions. case NonFatal(e) => throw new SparkException("Writing job aborted.", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 38ecb0dd12da..db1bf32a156c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} +import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchReadSupport} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} @@ -515,7 +515,7 @@ class MicroBatchExecution( newAttributePlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) - WriteToDataSourceV2(new MicroBatchWritSupport(currentBatchId, writer), newAttributePlan) + WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala similarity index 84% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala index 9f88416871f8..143235efee81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWritSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/MicroBatchWrite.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport} /** - * A [[BatchWriteSupport]] used to hook V2 stream writers into a microbatch plan. It implements + * A [[BatchWrite]] used to hook V2 stream writers into a microbatch plan. It implements * the non-streaming interface, forwarding the epoch ID determined at construction to a wrapped * streaming write support. */ -class MicroBatchWritSupport(eppchId: Long, val writeSupport: StreamingWriteSupport) - extends BatchWriteSupport { +class MicroBatchWrite(eppchId: Long, val writeSupport: StreamingWriteSupport) extends BatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = { writeSupport.commit(eppchId, messages) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala index ac3c71cc222b..fd4cb444ce58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala @@ -21,12 +21,12 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.sources.v2.writer.{BatchWriteSupport, DataWriter, DataWriterFactory, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.StreamingDataWriterFactory /** * A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery - * to a [[BatchWriteSupport]] on the driver. + * to a [[BatchWrite]] on the driver. * * Note that, because it sends all rows to the driver, this factory will generally be unsuitable * for production-quality sinks. It's intended for use in tests. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index d282193d35d7..216d6cb1f779 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -329,8 +329,8 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { .format(classOf[DataSourceV2WithSessionConfig].getName).load() val options = df.queryExecution.optimizedPlan.collectFirst { case d: DataSourceV2Relation => d.options - } - assert(options.get.get(optionName) == Some("false")) + }.get + assert(options.get(optionName).get == "false") } } @@ -350,24 +350,6 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } - - test("SPARK-25700: do not read schema when writing in other modes except append mode") { - withTempPath { file => - val cls = classOf[SimpleWriteOnlyDataSource] - val path = file.getCanonicalPath - val df = spark.range(5).select('id as 'i, -'id as 'j) - try { - df.write.format(cls.getName).option("path", path).mode("error").save() - df.write.format(cls.getName).option("path", path).mode("overwrite").save() - df.write.format(cls.getName).option("path", path).mode("ignore").save() - } catch { - case e: SchemaReadAttemptException => fail("Schema read was attempted.", e) - } - intercept[SchemaReadAttemptException] { - df.write.format(cls.getName).option("path", path).mode("append").save() - } - } - } } @@ -676,14 +658,3 @@ object SpecificReaderFactory extends PartitionReaderFactory { } } } - -class SchemaReadAttemptException(m: String) extends RuntimeException(m) - -class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { - override def writeSchema(): StructType = { - // This is a bit hacky since this source implements read support but throws - // during schema retrieval. Might have to rewrite but it's done - // such so for minimised changes. - throw new SchemaReadAttemptException("read is not supported") - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 82bb4fa33a3a..d90637aa0b2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources.v2 import java.io.{BufferedReader, InputStreamReader, IOException} -import java.util.Optional +import java.util.{Optional, UUID} import scala.collection.JavaConverters._ @@ -35,15 +35,13 @@ import org.apache.spark.util.SerializableConfiguration /** * A HDFS based transactional writable data source. - * Each task writes data to `target/_temporary/queryId/$jobId-$partitionId-$attemptNumber`. - * Each job moves files from `target/_temporary/queryId/` to `target`. + * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. + * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ class SimpleWritableDataSource extends DataSourceV2 - with TableProvider - with BatchWriteSupportProvider - with SessionConfigSupport { + with TableProvider with SessionConfigSupport { - protected def writeSchema(): StructType = new StructType().add("i", "long").add("j", "long") + private val tableSchema = new StructType().add("i", "long").add("j", "long") override def keyPrefix: String = "simpleWritableDataSource" @@ -68,22 +66,35 @@ class SimpleWritableDataSource extends DataSourceV2 new CSVReaderFactory(serializableConf) } - override def readSchema(): StructType = writeSchema + override def readSchema(): StructType = tableSchema } - override def getTable(options: DataSourceOptions): Table = { - val path = new Path(options.get("path").get()) - val conf = SparkContext.getActive.get.hadoopConfiguration - new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { - new MyScanBuilder(path.toUri.toString, conf) + class MyWriteBuilder(path: String, uniqueId: String) extends WriteBuilder { + override def buildWithSaveMode(mode: SaveMode): Optional[BatchWrite] = { + val hadoopPath = new Path(path) + val hadoopConf = SparkContext.getActive.get.hadoopConfiguration + val fs = hadoopPath.getFileSystem(hadoopConf) + + if (mode == SaveMode.ErrorIfExists) { + if (fs.exists(hadoopPath)) { + throw new RuntimeException("data already exists.") + } + } + if (mode == SaveMode.Ignore) { + if (fs.exists(hadoopPath)) { + return Optional.empty() + } + } + if (mode == SaveMode.Overwrite) { + fs.delete(hadoopPath, true) } - override def schema(): StructType = writeSchema + val pathStr = hadoopPath.toUri.toString + Optional.of(new MyBatchWrite(uniqueId, pathStr, hadoopConf)) } } - class WritSupport(queryId: String, path: String, conf: Configuration) extends BatchWriteSupport { + class MyBatchWrite(queryId: String, path: String, conf: Configuration) extends BatchWrite { override def createBatchWriterFactory(): DataWriterFactory = { SimpleCounter.resetCounter new CSVDataWriterFactory(path, queryId, new SerializableConfiguration(conf)) @@ -116,34 +127,25 @@ class SimpleWritableDataSource extends DataSourceV2 } } - override def createBatchWriteSupport( - queryId: String, - schema: StructType, - mode: SaveMode, - options: DataSourceOptions): Optional[BatchWriteSupport] = { - assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false)) - + override def getTable(options: DataSourceOptions): Table = { val path = new Path(options.get("path").get()) val conf = SparkContext.getActive.get.hadoopConfiguration - val fs = path.getFileSystem(conf) + new SimpleBatchTable with SupportsBatchWrite { + + override def schema(): StructType = tableSchema - if (mode == SaveMode.ErrorIfExists) { - if (fs.exists(path)) { - throw new RuntimeException("data already exists.") + override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + new MyScanBuilder(path.toUri.toString, conf) } - } - if (mode == SaveMode.Ignore) { - if (fs.exists(path)) { - return Optional.empty() + + override def newWriteBuilder(schema: StructType, options: DataSourceOptions): WriteBuilder = { + val path = options.get("path").get() + val uniqueId = UUID.randomUUID().toString + new MyWriteBuilder(path, uniqueId) } } - if (mode == SaveMode.Overwrite) { - fs.delete(path, true) - } - - val pathStr = path.toUri.toString - Optional.of(new WritSupport(queryId, pathStr, conf)) } + } case class CSVInputPartitionReader(path: String) extends InputPartition From 1fbd5301610a9b098c005ca1238d7e44bf42cf01 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 14 Dec 2018 12:09:07 +0800 Subject: [PATCH 2/7] another idea --- .../sql/sources/v2/SupportsBatchWrite.java | 3 +-- .../spark/sql/sources/v2/SupportsRead.java | 2 +- .../sources/v2/writer/SupportsSaveMode.java | 26 +++++++++++++++++++ .../sql/sources/v2/writer/WriteBuilder.java | 23 ++++++++++++---- .../apache/spark/sql/DataFrameReader.scala | 4 +-- .../apache/spark/sql/DataFrameWriter.scala | 22 +++++++++++----- .../datasources/v2/DataSourceV2Relation.scala | 15 +++++++---- .../datasources/v2/DataSourceV2Strategy.scala | 13 +++++++--- .../sources/v2/SimpleWritableDataSource.scala | 19 ++++++++++---- 9 files changed, 98 insertions(+), 29 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java index d0204413b4e5..6e4f73a8ead8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources.v2; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; import org.apache.spark.sql.types.StructType; @@ -27,7 +26,7 @@ *

* If a {@link Table} implements this interface, the * {@link SupportsWrite#newWriteBuilder(StructType, DataSourceOptions)} must return a - * {@link WriteBuilder} with {@link WriteBuilder#buildWithSaveMode(SaveMode)} implemented. + * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java index 5a0adf0d74e9..5031c71c0fd4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java @@ -31,7 +31,7 @@ interface SupportsRead extends Table { * Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this * method to configure each data source scan. * - * @param options The options for writing, which is an immutable case-insensitive + * @param options The options for reading, which is an immutable case-insensitive * string-to-string map. */ ScanBuilder newScanBuilder(DataSourceOptions options); diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java new file mode 100644 index 000000000000..ad936d7895e1 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.sql.SaveMode; + +// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before +// Spark 3.0 when all the new write operators are finished. +public interface SupportsSaveMode extends WriteBuilder { + WriteBuilder mode(SaveMode mode); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java index 1bd8a27251f0..71bf37a469a8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java @@ -17,14 +17,27 @@ package org.apache.spark.sql.sources.v2.writer; -import java.util.Optional; - import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.SupportsBatchWrite; +import org.apache.spark.sql.sources.v2.Table; +/** + * An interface for building the {@link BatchWrite}. Implementations can mix in interfaces like + * {@link SupportsSaveMode} to support different ways to write data to data sources. + */ @Evolving public interface WriteBuilder { - // TODO: remove it after we finish all the new write operators. - Optional buildWithSaveMode(SaveMode mode); + /** + * Returns a {@link BatchWrite} to write data to batch source. By default this method throws + * exception, data sources must overwrite this method to provide an implementation, if the + * {@link Table} that creates this scan implements {@link SupportsBatchWrite}. + * + * Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode, + * to indicate that no writing is needed. We can clean it up after removing + * {@link SupportsSaveMode}. + */ + default BatchWrite buildForBatch() { + throw new UnsupportedOperationException("Batch scans are not supported"); + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 192c045c0805..af369a5bca46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -209,8 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { case _ => provider.getTable(dsOptions) } table match { - case readableTable: SupportsBatchRead => - Dataset.ofRows(sparkSession, DataSourceV2Relation.create(readableTable, dsOptions)) + case _: SupportsBatchRead => + Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions)) case _ => loadV1Source(paths: _*) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index eb6b10b2a01c..a96c84288488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, Logi import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, WriteToDataSourceV2} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ +import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode import org.apache.spark.sql.types.StructType /** @@ -251,17 +252,26 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val dsOptions = new DataSourceOptions(options.asJava) provider.getTable(dsOptions) match { case table: SupportsBatchWrite => - val relation = DataSourceV2Relation.create(table, dsOptions) + val relation = DataSourceV2Relation.create(table, options) if (mode == SaveMode.Append) { runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } } else { - val write = relation.newWriteBuilder(df.logicalPlan.schema).buildWithSaveMode(mode) - if (write.isPresent) { - runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(write.get, df.logicalPlan) - } + val writeBuilder = relation.newWriteBuilder(df.logicalPlan.schema) + writeBuilder match { + case s: SupportsSaveMode => + val write = s.mode(mode).buildForBatch() + // It can only return null with `SupportsSaveMode`. We can clean it up after + // removing `SupportsSaveMode`. + if (write != null) { + runCommand(df.sparkSession, "save") { + WriteToDataSourceV2(write, df.logicalPlan) + } + } + + case _ => throw new AnalysisException( + s"data source ${relation.name} does not support SaveMode") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 0b6fdcc71f65..dbcaf5b5cf23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} @@ -37,8 +39,7 @@ import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( table: Table, output: Seq[AttributeReference], - // TODO: use a simple case insensitive map instead. - options: DataSourceOptions) + options: Map[String, String]) extends LeafNode with MultiInstanceRelation with NamedRelation { override def name: String = table.name() @@ -48,14 +49,18 @@ case class DataSourceV2Relation( } def newScanBuilder(): ScanBuilder = table match { - case s: SupportsBatchRead => s.newScanBuilder(options) + case s: SupportsBatchRead => + val dsOptions = new DataSourceOptions(options.asJava) + s.newScanBuilder(dsOptions) case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}") } def newWriteBuilder(schema: StructType): WriteBuilder = table match { - case s: SupportsBatchWrite => s.newWriteBuilder(schema, options) + case s: SupportsBatchWrite => + val dsOptions = new DataSourceOptions(options.asJava) + s.newWriteBuilder(schema, dsOptions) case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}") } @@ -122,7 +127,7 @@ case class StreamingDataSourceV2Relation( } object DataSourceV2Relation { - def create(table: Table, options: DataSourceOptions): DataSourceV2Relation = { + def create(table: Table, options: Map[String, String]): DataSourceV2Relation = { val output = table.schema().toAttributes DataSourceV2Relation(table, output, options) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 90d66dabd32f..79540b024621 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.{sources, SaveMode, Strategy} +import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition} @@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport +import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode object DataSourceV2Strategy extends Strategy { @@ -136,8 +137,14 @@ object DataSourceV2Strategy extends Strategy { WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - val write = r.newWriteBuilder(query.schema).buildWithSaveMode(SaveMode.Append).get() - WriteToDataSourceV2Exec(write, planLater(query)) :: Nil + val writeBuilder = r.newWriteBuilder(query.schema) + writeBuilder match { + case s: SupportsSaveMode => + val write = s.mode(SaveMode.Append).buildForBatch() + assert(write != null) + WriteToDataSourceV2Exec(write, planLater(query)) :: Nil + case _ => throw new AnalysisException(s"data source ${r.name} does not support SaveMode") + } case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index d90637aa0b2c..b2c0a637c8ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources.v2 import java.io.{BufferedReader, InputStreamReader, IOException} -import java.util.{Optional, UUID} +import java.util.UUID import scala.collection.JavaConverters._ @@ -69,8 +69,17 @@ class SimpleWritableDataSource extends DataSourceV2 override def readSchema(): StructType = tableSchema } - class MyWriteBuilder(path: String, uniqueId: String) extends WriteBuilder { - override def buildWithSaveMode(mode: SaveMode): Optional[BatchWrite] = { + class MyWriteBuilder(path: String, uniqueId: String) extends WriteBuilder with SupportsSaveMode { + private var mode: SaveMode = _ + + override def mode(mode: SaveMode): WriteBuilder = { + this.mode = mode + this + } + + override def buildForBatch(): BatchWrite = { + assert(mode != null) + val hadoopPath = new Path(path) val hadoopConf = SparkContext.getActive.get.hadoopConfiguration val fs = hadoopPath.getFileSystem(hadoopConf) @@ -82,7 +91,7 @@ class SimpleWritableDataSource extends DataSourceV2 } if (mode == SaveMode.Ignore) { if (fs.exists(hadoopPath)) { - return Optional.empty() + return null } } if (mode == SaveMode.Overwrite) { @@ -90,7 +99,7 @@ class SimpleWritableDataSource extends DataSourceV2 } val pathStr = hadoopPath.toUri.toString - Optional.of(new MyBatchWrite(uniqueId, pathStr, hadoopConf)) + new MyBatchWrite(uniqueId, pathStr, hadoopConf) } } From d19e76f0764b528ef2814fc5d190d2f5c01df934 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 14 Dec 2018 19:35:08 +0800 Subject: [PATCH 3/7] add back a test --- .../apache/spark/sql/DataFrameWriter.scala | 6 ++-- .../sql/sources/v2/DataSourceV2Suite.scala | 29 +++++++++++++++++++ .../sources/v2/SimpleWritableDataSource.scala | 26 ++++++++--------- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a96c84288488..f9e907279a8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -252,13 +252,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val dsOptions = new DataSourceOptions(options.asJava) provider.getTable(dsOptions) match { case table: SupportsBatchWrite => - val relation = DataSourceV2Relation.create(table, options) if (mode == SaveMode.Append) { + val relation = DataSourceV2Relation.create(table, options) runCommand(df.sparkSession, "save") { AppendData.byName(relation, df.logicalPlan) } } else { - val writeBuilder = relation.newWriteBuilder(df.logicalPlan.schema) + val writeBuilder = table.newWriteBuilder(df.logicalPlan.schema, dsOptions) writeBuilder match { case s: SupportsSaveMode => val write = s.mode(mode).buildForBatch() @@ -271,7 +271,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } case _ => throw new AnalysisException( - s"data source ${relation.name} does not support SaveMode") + s"data source ${table.name} does not support SaveMode") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 216d6cb1f779..c60ea4a2f9f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -350,6 +350,22 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-25700: do not read schema when writing in other modes except append mode") { + withTempPath { file => + val cls = classOf[SimpleWriteOnlyDataSource] + val path = file.getCanonicalPath + val df = spark.range(5).select('id as 'i, -'id as 'j) + // non-append mode should not throw exception, as they don't access schema. + df.write.format(cls.getName).option("path", path).mode("error").save() + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + df.write.format(cls.getName).option("path", path).mode("ignore").save() + // append mode will access schema and should throw exception. + intercept[SchemaReadAttemptException] { + df.write.format(cls.getName).option("path", path).mode("append").save() + } + } + } } @@ -658,3 +674,16 @@ object SpecificReaderFactory extends PartitionReaderFactory { } } } + +class SchemaReadAttemptException(m: String) extends RuntimeException(m) + +class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { + + override def getTable(options: DataSourceOptions): Table = { + new MyTable(options) { + override def schema(): StructType = { + throw new SchemaReadAttemptException("schema should not be read.") + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index b2c0a637c8ce..65f13b1a82ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -136,25 +136,25 @@ class SimpleWritableDataSource extends DataSourceV2 } } - override def getTable(options: DataSourceOptions): Table = { - val path = new Path(options.get("path").get()) - val conf = SparkContext.getActive.get.hadoopConfiguration - new SimpleBatchTable with SupportsBatchWrite { + class MyTable(options: DataSourceOptions) extends SimpleBatchTable with SupportsBatchWrite { + private val path = options.get("path").get() + private val conf = SparkContext.getActive.get.hadoopConfiguration - override def schema(): StructType = tableSchema + override def schema(): StructType = tableSchema - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { - new MyScanBuilder(path.toUri.toString, conf) - } + override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + new MyScanBuilder(new Path(path).toUri.toString, conf) + } - override def newWriteBuilder(schema: StructType, options: DataSourceOptions): WriteBuilder = { - val path = options.get("path").get() - val uniqueId = UUID.randomUUID().toString - new MyWriteBuilder(path, uniqueId) - } + override def newWriteBuilder(schema: StructType, options: DataSourceOptions): WriteBuilder = { + val uniqueId = UUID.randomUUID().toString + new MyWriteBuilder(path, uniqueId) } } + override def getTable(options: DataSourceOptions): Table = { + new MyTable(options) + } } case class CSVInputPartitionReader(path: String) extends InputPartition From 44cdb549342a7a8907ecdc7762c16155bfd3268b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 4 Jan 2019 16:21:43 +0800 Subject: [PATCH 4/7] add back queryId --- .../apache/spark/sql/sources/v2/SupportsBatchWrite.java | 2 +- .../org/apache/spark/sql/sources/v2/SupportsWrite.java | 9 ++++++--- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 5 +++-- .../execution/datasources/v2/DataSourceV2Relation.scala | 4 +++- .../spark/sql/sources/v2/SimpleWritableDataSource.scala | 9 +++++---- 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java index 6e4f73a8ead8..f5eb4ce3936b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -25,7 +25,7 @@ * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. *

* If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(StructType, DataSourceOptions)} must return a + * {@link SupportsWrite#newWriteBuilder(String, StructType, DataSourceOptions)} must return a * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented. *

*/ diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java index 18e933a1b355..e3ae0257ca21 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java @@ -23,8 +23,8 @@ /** * An internal base interface of mix-in interfaces for writable {@link Table}. This adds - * {@link #newWriteBuilder(StructType, DataSourceOptions)} that is used to create a write for batch - * or streaming. + * {@link #newWriteBuilder(String, StructType, DataSourceOptions)} that is used to create a write + * for batch or streaming. */ interface SupportsWrite extends Table { @@ -32,9 +32,12 @@ interface SupportsWrite extends Table { * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call * this method to configure each data source write. * + * @param queryId A unique string for the writing query. It's possible that there are many + * writing queries running at the same time, or a query is restarted. + * {@link BatchWrite} can use this id to identify the query. * @param schema The schema of the data to write. * @param options The options for writing, which is an immutable case-insensitive * string-to-string map. */ - WriteBuilder newWriteBuilder(StructType schema, DataSourceOptions options); + WriteBuilder newWriteBuilder(String queryId, StructType schema, DataSourceOptions options); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f9e907279a8c..b9cd449f7829 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.util.{Locale, Properties} +import java.util.{Locale, Properties, UUID} import scala.collection.JavaConverters._ @@ -258,7 +258,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { AppendData.byName(relation, df.logicalPlan) } } else { - val writeBuilder = table.newWriteBuilder(df.logicalPlan.schema, dsOptions) + val writeBuilder = table.newWriteBuilder( + UUID.randomUUID().toString, df.logicalPlan.schema, dsOptions) writeBuilder match { case s: SupportsSaveMode => val write = s.mode(mode).buildForBatch() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index dbcaf5b5cf23..f056714aff69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException @@ -60,7 +62,7 @@ case class DataSourceV2Relation( def newWriteBuilder(schema: StructType): WriteBuilder = table match { case s: SupportsBatchWrite => val dsOptions = new DataSourceOptions(options.asJava) - s.newWriteBuilder(schema, dsOptions) + s.newWriteBuilder(UUID.randomUUID().toString, schema, dsOptions) case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 65f13b1a82ea..61289a981e41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources.v2 import java.io.{BufferedReader, InputStreamReader, IOException} -import java.util.UUID import scala.collection.JavaConverters._ @@ -146,9 +145,11 @@ class SimpleWritableDataSource extends DataSourceV2 new MyScanBuilder(new Path(path).toUri.toString, conf) } - override def newWriteBuilder(schema: StructType, options: DataSourceOptions): WriteBuilder = { - val uniqueId = UUID.randomUUID().toString - new MyWriteBuilder(path, uniqueId) + override def newWriteBuilder( + queryId: String, + schema: StructType, + options: DataSourceOptions): WriteBuilder = { + new MyWriteBuilder(path, queryId) } } From ee7acbced67d3d084b1479b8303fe815701893ef Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 10 Jan 2019 11:19:35 +0800 Subject: [PATCH 5/7] improve doc --- .../java/org/apache/spark/sql/sources/v2/SupportsWrite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java index e3ae0257ca21..297b7ad068d3 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java @@ -32,8 +32,8 @@ interface SupportsWrite extends Table { * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call * this method to configure each data source write. * - * @param queryId A unique string for the writing query. It's possible that there are many - * writing queries running at the same time, or a query is restarted. + * @param queryId A unique string of the writing query. It's possible that there are many + * writing queries running at the same time, or a query is restarted and resumed. * {@link BatchWrite} can use this id to identify the query. * @param schema The schema of the data to write. * @param options The options for writing, which is an immutable case-insensitive From ec6129af6eff9dfef0336d3c0d02265aa79068a1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 11 Jan 2019 10:43:13 +0800 Subject: [PATCH 6/7] update --- .../sql/sources/v2/SupportsBatchWrite.java | 5 ++--- .../spark/sql/sources/v2/SupportsWrite.java | 12 ++---------- .../sql/sources/v2/writer/WriteBuilder.java | 17 +++++++++++++++++ .../org/apache/spark/sql/DataFrameWriter.scala | 5 +++-- .../datasources/v2/DataSourceV2Relation.scala | 4 +++- .../sources/v2/SimpleWritableDataSource.scala | 17 ++++++++++------- 6 files changed, 37 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java index f5eb4ce3936b..08caadd5308e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -19,14 +19,13 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; -import org.apache.spark.sql.types.StructType; /** * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. *

* If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(String, StructType, DataSourceOptions)} must return a - * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented. + * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} + * with {@link WriteBuilder#buildForBatch()} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java index 297b7ad068d3..ecdfe2073025 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java @@ -19,11 +19,10 @@ import org.apache.spark.sql.sources.v2.writer.BatchWrite; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; -import org.apache.spark.sql.types.StructType; /** * An internal base interface of mix-in interfaces for writable {@link Table}. This adds - * {@link #newWriteBuilder(String, StructType, DataSourceOptions)} that is used to create a write + * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write * for batch or streaming. */ interface SupportsWrite extends Table { @@ -31,13 +30,6 @@ interface SupportsWrite extends Table { /** * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call * this method to configure each data source write. - * - * @param queryId A unique string of the writing query. It's possible that there are many - * writing queries running at the same time, or a query is restarted and resumed. - * {@link BatchWrite} can use this id to identify the query. - * @param schema The schema of the data to write. - * @param options The options for writing, which is an immutable case-insensitive - * string-to-string map. */ - WriteBuilder newWriteBuilder(String queryId, StructType schema, DataSourceOptions options); + WriteBuilder newWriteBuilder(DataSourceOptions options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java index 71bf37a469a8..19c6e78c5240 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java @@ -20,6 +20,7 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.SupportsBatchWrite; import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; /** * An interface for building the {@link BatchWrite}. Implementations can mix in interfaces like @@ -28,6 +29,22 @@ @Evolving public interface WriteBuilder { + /** + * Returns a new builder with the `queryId`. `queryId` is a unique string of the query. It's + * possible that there are many queries running at the same time, or a query is restarted and + * resumed. {@link BatchWrite} can use this id to identify the query. + */ + default WriteBuilder withQueryId(String queryId) { + return this; + } + + /** + * Returns a new builder with the schema of the input data to write. + */ + default WriteBuilder withInputDataSchema(StructType schema) { + return this; + } + /** * Returns a {@link BatchWrite} to write data to batch source. By default this method throws * exception, data sources must overwrite this method to provide an implementation, if the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b9cd449f7829..0816ca005ca1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -258,8 +258,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { AppendData.byName(relation, df.logicalPlan) } } else { - val writeBuilder = table.newWriteBuilder( - UUID.randomUUID().toString, df.logicalPlan.schema, dsOptions) + val writeBuilder = table.newWriteBuilder(dsOptions) + .withQueryId(UUID.randomUUID().toString) + .withInputDataSchema(df.logicalPlan.schema) writeBuilder match { case s: SupportsSaveMode => val write = s.mode(mode).buildForBatch() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index f056714aff69..632157818434 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -62,7 +62,9 @@ case class DataSourceV2Relation( def newWriteBuilder(schema: StructType): WriteBuilder = table match { case s: SupportsBatchWrite => val dsOptions = new DataSourceOptions(options.asJava) - s.newWriteBuilder(UUID.randomUUID().toString, schema, dsOptions) + s.newWriteBuilder(dsOptions) + .withQueryId(UUID.randomUUID().toString) + .withInputDataSchema(schema) case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 61289a981e41..6e4f2bbcd6b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -68,9 +68,15 @@ class SimpleWritableDataSource extends DataSourceV2 override def readSchema(): StructType = tableSchema } - class MyWriteBuilder(path: String, uniqueId: String) extends WriteBuilder with SupportsSaveMode { + class MyWriteBuilder(path: String) extends WriteBuilder with SupportsSaveMode { + private var queryId: String = _ private var mode: SaveMode = _ + override def withQueryId(queryId: String): WriteBuilder = { + this.queryId = queryId + this + } + override def mode(mode: SaveMode): WriteBuilder = { this.mode = mode this @@ -98,7 +104,7 @@ class SimpleWritableDataSource extends DataSourceV2 } val pathStr = hadoopPath.toUri.toString - new MyBatchWrite(uniqueId, pathStr, hadoopConf) + new MyBatchWrite(queryId, pathStr, hadoopConf) } } @@ -145,11 +151,8 @@ class SimpleWritableDataSource extends DataSourceV2 new MyScanBuilder(new Path(path).toUri.toString, conf) } - override def newWriteBuilder( - queryId: String, - schema: StructType, - options: DataSourceOptions): WriteBuilder = { - new MyWriteBuilder(path, queryId) + override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + new MyWriteBuilder(path) } } From 693fb986e54ef8f7d09d6c028d18df50d2db117e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 15 Jan 2019 14:31:38 +0800 Subject: [PATCH 7/7] address comments --- .../sql/sources/v2/writer/SupportsSaveMode.java | 2 +- .../sql/sources/v2/writer/WriteBuilder.java | 17 +++++++++++++---- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java index ad936d7895e1..c4295f237187 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsSaveMode.java @@ -20,7 +20,7 @@ import org.apache.spark.sql.SaveMode; // A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before -// Spark 3.0 when all the new write operators are finished. +// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details. public interface SupportsSaveMode extends WriteBuilder { WriteBuilder mode(SaveMode mode); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java index 19c6e78c5240..e861c72af9e6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriteBuilder.java @@ -23,23 +23,32 @@ import org.apache.spark.sql.types.StructType; /** - * An interface for building the {@link BatchWrite}. Implementations can mix in interfaces like - * {@link SupportsSaveMode} to support different ways to write data to data sources. + * An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to + * support different ways to write data to data sources. + * + * Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to + * append data without affecting existing data. */ @Evolving public interface WriteBuilder { /** - * Returns a new builder with the `queryId`. `queryId` is a unique string of the query. It's + * Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's * possible that there are many queries running at the same time, or a query is restarted and * resumed. {@link BatchWrite} can use this id to identify the query. + * + * @return a new builder with the `queryId`. By default it returns `this`, which means the given + * `queryId` is ignored. Please override this method to take the `queryId`. */ default WriteBuilder withQueryId(String queryId) { return this; } /** - * Returns a new builder with the schema of the input data to write. + * Passes the schema of the input data from Spark to data source. + * + * @return a new builder with the `schema`. By default it returns `this`, which means the given + * `schema` is ignored. Please override this method to take the `schema`. */ default WriteBuilder withInputDataSchema(StructType schema) { return this; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 0816ca005ca1..228dcb94b9ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -273,7 +273,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } case _ => throw new AnalysisException( - s"data source ${table.name} does not support SaveMode") + s"data source ${table.name} does not support SaveMode $mode") } }