Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -42,8 +42,8 @@ case class AvroTable(
override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
new AvroWriteBuilder(options, paths, formatName, supportsDataType)
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new AvroWriteBuilder(paths, formatName, supportsDataType, info)

override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ package org.apache.spark.sql.v2.avro
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.sql.avro.AvroUtils
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class AvroWriteBuilder(
options: CaseInsensitiveStringMap,
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean)
extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo)
extends FileWriteBuilder(paths, formatName, supportsDataType, info) {
override def prepareWrite(
sqlConf: SQLConf,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -392,18 +392,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
() => new KafkaScan(options)

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder {
private var inputSchema: StructType = _
private val options = info.options
private val inputSchema: StructType = info.schema()
private val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
private val producerParams =
kafkaParamsForProducer(CaseInsensitiveMap(options.asScala.toMap))

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def buildForBatch(): BatchWrite = {
assert(inputSchema != null)
new KafkaBatchWrite(topic, producerParams, inputSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* Represents a table which is staged for being committed to the metastore.
Expand All @@ -32,10 +32,10 @@
* {@link StagingTableCatalog#stageCreate(Identifier, StructType, Transform[], Map)} or
* {@link StagingTableCatalog#stageReplace(Identifier, StructType, Transform[], Map)} to prepare the
* table for being written to. This table should usually implement {@link SupportsWrite}. A new
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)},
* and the write will be committed. The job concludes with a call to {@link #commitStagedChanges()},
* at which point implementations are expected to commit the table's metadata into the metastore
* along with the data that was written by the writes from the write builder this table created.
* writer will be constructed via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}, and the
* write will be committed. The job concludes with a call to {@link #commitStagedChanges()}, at
* which point implementations are expected to commit the table's metadata into the metastore along
* with the data that was written by the writes from the write builder this table created.
*/
@Experimental
public interface StagedTable extends Table {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* An optional mix-in for implementations of {@link TableCatalog} that support staging creation of
Expand All @@ -39,9 +39,9 @@
* TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first
* drop the table via {@link TableCatalog#dropTable(Identifier)}, then create the table via
* {@link TableCatalog#createTable(Identifier, StructType, Transform[], Map)}, and then perform
* the write via {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)}. However, if the
* write operation fails, the catalog will have already dropped the table, and the planner cannot
* roll back the dropping of the table.
* the write via {@link SupportsWrite#newWriteBuilder(LogicalWriteInfo)}.
* However, if the write operation fails, the catalog will have already dropped the table, and the
* planner cannot roll back the dropping of the table.
* <p>
* If the catalog implements this plugin, the catalog can implement the methods to "stage" the
* creation and the replacement of a table. After the table's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A mix-in interface of {@link Table}, to indicate that it's writable. This adds
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write
* for batch or streaming.
* {@link #newWriteBuilder(LogicalWriteInfo)} that is used to create a
* write for batch or streaming.
*/
@Experimental
public interface SupportsWrite extends Table {
Expand All @@ -34,5 +34,5 @@ public interface SupportsWrite extends Table {
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
* this method to configure each data source write.
*/
WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options);
WriteBuilder newWriteBuilder(LogicalWriteInfo info);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* This interface contains logical write information that data sources can use when generating a
* {@link WriteBuilder}.
*/
@Evolving
public interface LogicalWriteInfo {
/**
* the options that the user specified when writing the dataset
*/
CaseInsensitiveStringMap options();

/**
* `queryId` is a unique string of the query. It's possible that there are many queries
* running at the same time, or a query is restarted and resumed. {@link BatchWrite} can use
* this id to identify the query.
*/
String queryId();

/**
* the schema of the input data from Spark to data source.
*/
StructType schema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.apache.spark.sql.types.StructType;

/**
* An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
Expand All @@ -33,28 +32,6 @@
@Evolving
public interface WriteBuilder {

/**
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
* possible that there are many queries running at the same time, or a query is restarted and
* resumed. {@link BatchWrite} can use this id to identify the query.
*
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
* `queryId` is ignored. Please override this method to take the `queryId`.
*/
default WriteBuilder withQueryId(String queryId) {
return this;
}

/**
* Passes the schema of the input data from Spark to data source.
*
* @return a new builder with the `schema`. By default it returns `this`, which means the given
* `schema` is ignored. Please override this method to take the `schema`.
*/
default WriteBuilder withInputDataSchema(StructType schema) {
return this;
}

/**
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.write

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

private[sql] case class LogicalWriteInfoImpl(
queryId: String,
schema: StructType,
options: CaseInsensitiveStringMap) extends LogicalWriteInfo
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class InMemoryTable(
override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
}

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
InMemoryTable.maybeSimulateFailedTableWrite(options)
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
InMemoryTable.maybeSimulateFailedTableWrite(info.options)

new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite {
private var writer: BatchWrite = Append
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand Down Expand Up @@ -88,8 +88,8 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable

override def capabilities(): util.Set[TableCapability] = delegateTable.capabilities

override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
delegateTable.newWriteBuilder(options)
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
delegateTable.newWriteBuilder(info)
}

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableProvider}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
Expand All @@ -39,7 +39,7 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
}

private[noop] object NoopTable extends Table with SupportsWrite {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,24 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.{BatchWrite, WriteBuilder}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.SerializableConfiguration

abstract class FileWriteBuilder(
options: CaseInsensitiveStringMap,
paths: Seq[String],
formatName: String,
supportsDataType: DataType => Boolean) extends WriteBuilder {
private var schema: StructType = _
private var queryId: String = _
supportsDataType: DataType => Boolean,
info: LogicalWriteInfo) extends WriteBuilder {
private val schema = info.schema()
private val queryId = info.queryId()
private val options = info.options()
private var mode: SaveMode = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.schema = schema
this
}

override def withQueryId(queryId: String): WriteBuilder = {
this.queryId = queryId
this
}

def mode(mode: SaveMode): WriteBuilder = {
this.mode = mode
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.write.{SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.sources.{AlwaysTrue, Filter, InsertableRelation}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -98,9 +98,12 @@ sealed trait V1FallbackWriters extends SupportsV1Write {
}

protected def newWriteBuilder(): V1WriteBuilder = {
val writeBuilder = table.newWriteBuilder(writeOptions)
.withInputDataSchema(plan.schema)
.withQueryId(UUID.randomUUID().toString)
val info = LogicalWriteInfoImpl(
queryId = UUID.randomUUID().toString,
schema = plan.schema,
options = writeOptions)
val writeBuilder = table.newWriteBuilder(info)

writeBuilder.asV1Builder
}
}
Expand Down
Loading