Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@
import org.apache.spark.annotation.Evolving;

/**
* The base interface for data source v2. Implementations must have a public, 0-arg constructor.
*
* Note that this is an empty interface. Data source implementations must mix in interfaces such as
* {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide
* batch or streaming read/write support instances. Otherwise it's just a dummy data source which
* is un-readable/writable.
*
* If Spark fails to execute any methods in the implementations of this interface (by throwing an
* exception), the read action will fail and no Spark job will be submitted.
* TODO: remove it when we finish the API refactor for streaming side.
*/
@Evolving
public interface DataSourceV2 {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;

/**
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
* <p>
* If a {@link Table} implements this interface, the
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
* with {@link WriteBuilder#buildForBatch()} implemented.
* </p>
*/
@Evolving
public interface SupportsBatchWrite extends SupportsWrite {}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ interface SupportsRead extends Table {

/**
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this
* method to configure each scan.
* method to configure each data source scan.
*
* @param options The options for reading, which is an immutable case-insensitive
* string-to-string map.
*/
ScanBuilder newScanBuilder(DataSourceOptions options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.v2.writer.BatchWrite;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;

/**
* An internal base interface of mix-in interfaces for writable {@link Table}. This adds
* {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
* for batch or streaming.
*/
interface SupportsWrite extends Table {

/**
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
* this method to configure each data source write.
*/
WriteBuilder newWriteBuilder(DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* Please refer to the documentation of commit/abort methods for detailed specifications.
*/
@Evolving
public interface BatchWriteSupport {
public interface BatchWrite {

/**
* Creates a writer factory which will be serialized and sent to executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data
* {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
* In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a
* different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])}
* different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])}
* when the configured number of retries is exhausted.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
Expand Down Expand Up @@ -71,11 +71,11 @@ public interface DataWriter<T> {
/**
* Commits this writer after all records are written successfully, returns a commit message which
* will be sent back to driver side and passed to
* {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
* {@link BatchWrite#commit(WriterCommitMessage[])}.
*
* The written data should only be visible to data source readers after
* {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to
* {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link BatchWrite} at driver side to
* do the final commit via {@link WriterCommitMessage}.
*
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
Expand All @@ -93,7 +93,7 @@ public interface DataWriter<T> {
* failed.
*
* If this method fails(by throwing an exception), the underlying data source may have garbage
* that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
* that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually,
* but these garbage should not be visible to data source readers.
*
* @throws IOException if failure happens during disk/network IO like writing files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.spark.sql.catalyst.InternalRow;

/**
* A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()},
* A factory of {@link DataWriter} returned by {@link BatchWrite#createBatchWriterFactory()},
* which is responsible for creating and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.sql.SaveMode;

// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before
// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details.
public interface SupportsSaveMode extends WriteBuilder {
WriteBuilder mode(SaveMode mode);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.types.StructType;

/**
* An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
* support different ways to write data to data sources.
*
* Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to
* append data without affecting existing data.
*/
@Evolving
public interface WriteBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this documentation needs to state that unless otherwise modified by a mix-in, the write that is configured by this builder is to append data without affecting existing data.


/**
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
* possible that there are many queries running at the same time, or a query is restarted and
* resumed. {@link BatchWrite} can use this id to identify the query.
*
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
* `queryId` is ignored. Please override this method to take the `queryId`.
*/
default WriteBuilder withQueryId(String queryId) {
return this;
}

/**
* Passes the schema of the input data from Spark to data source.
*
* @return a new builder with the `schema`. By default it returns `this`, which means the given
* `schema` is ignored. Please override this method to take the `schema`.
*/
default WriteBuilder withInputDataSchema(StructType schema) {
return this;
}

/**
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this scan implements {@link SupportsBatchWrite}.
*
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
* to indicate that no writing is needed. We can clean it up after removing
* {@link SupportsSaveMode}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning null for now sounds fine to me.

*/
default BatchWrite buildForBatch() {
throw new UnsupportedOperationException("Batch scans are not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
* {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
case _ => provider.getTable(dsOptions)
}
table match {
case s: SupportsBatchRead =>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema))

case _: SupportsBatchRead =>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
case _ => loadV1Source(paths: _*)
}
} else {
Expand Down
Loading