-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22026][SQL] data source v2 write path #19269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2; | ||
|
|
||
| import java.util.Optional; | ||
|
|
||
| import org.apache.spark.annotation.InterfaceStability; | ||
| import org.apache.spark.sql.SaveMode; | ||
| import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to | ||
| * provide data writing ability and save the data to the data source. | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface WriteSupport { | ||
|
|
||
| /** | ||
| * Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data | ||
| * sources can return None if there is no writing needed to be done according to the save mode. | ||
| * | ||
| * @param schema the schema of the data to be written. | ||
| * @param mode the save mode which determines what to do when the data are already in this data | ||
| * source, please refer to {@link SaveMode} for more details. | ||
| * @param options the options for the returned data source writer, which is an immutable | ||
| * case-insensitive string-to-string map. | ||
| */ | ||
| Optional<DataSourceV2Writer> createWriter( | ||
| StructType schema, SaveMode mode, DataSourceV2Options options); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.writer; | ||
|
|
||
| import org.apache.spark.annotation.InterfaceStability; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.SaveMode; | ||
| import org.apache.spark.sql.sources.v2.DataSourceV2Options; | ||
| import org.apache.spark.sql.sources.v2.WriteSupport; | ||
| import org.apache.spark.sql.types.StructType; | ||
|
|
||
| /** | ||
| * A data source writer that is returned by | ||
| * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. | ||
| * It can mix in various writing optimization interfaces to speed up the data saving. The actual | ||
| * writing logic is delegated to {@link DataWriter}. | ||
| * | ||
| * The writing procedure is: | ||
| * 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the | ||
| * partitions of the input data(RDD). | ||
| * 2. For each partition, create the data writer, and write the data of the partition with this | ||
| * writer. If all the data are written successfully, call {@link DataWriter#commit()}. If | ||
| * exception happens during the writing, call {@link DataWriter#abort()}. | ||
| * 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If | ||
| * some writers are aborted, or the job failed with an unknown reason, call | ||
| * {@link #abort(WriterCommitMessage[])}. | ||
| * | ||
| * Spark won't retry failed writing jobs, users should do it manually in their Spark applications if | ||
| * they want to retry. | ||
| * | ||
| * Please refer to the document of commit/abort methods for detailed specifications. | ||
| * | ||
| * Note that, this interface provides a protocol between Spark and data sources for transactional | ||
| * data writing, but the transaction here is Spark-level transaction, which may not be the | ||
| * underlying storage transaction. For example, Spark successfully writes data to a Cassandra data | ||
| * source, but Cassandra may need some more time to reach consistency at storage level. | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface DataSourceV2Writer { | ||
|
|
||
| /** | ||
| * Creates a writer factory which will be serialized and sent to executors. | ||
| */ | ||
| DataWriterFactory<Row> createWriterFactory(); | ||
|
|
||
| /** | ||
| * Commits this writing job with a list of commit messages. The commit messages are collected from | ||
| * successful data writers and are produced by {@link DataWriter#commit()}. If this method | ||
| * fails(throw exception), this writing job is considered to be failed, and | ||
| * {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible | ||
| * to data source readers if this method successes. | ||
| * | ||
| * Note that, one partition may have multiple committed data writers because of speculative tasks. | ||
| * Spark will pick the first successful one and get its commit message. Implementations should be | ||
| * aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer | ||
| * can commit successfully, or have a way to clean up the data of already-committed writers. | ||
| */ | ||
| void commit(WriterCommitMessage[] messages); | ||
|
|
||
| /** | ||
| * Aborts this writing job because some data writers are failed to write the records and aborted, | ||
| * or the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} | ||
| * fails. If this method fails(throw exception), the underlying data source may have garbage that | ||
| * need to be cleaned manually, but these garbage should not be visible to data source readers. | ||
| * | ||
| * Unless the abort is triggered by the failure of commit, the given messages should have some | ||
| * null slots as there maybe only a few data writers that are committed before the abort | ||
| * happens, or some data writers were committed but their commit messages haven't reached the | ||
| * driver when the abort is triggered. So this is just a "best effort" for data sources to | ||
| * clean up the data left by data writers. | ||
| */ | ||
| void abort(WriterCommitMessage[] messages); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.writer; | ||
|
|
||
| import org.apache.spark.annotation.InterfaceStability; | ||
|
|
||
| /** | ||
| * A data writer returned by {@link DataWriterFactory#createWriter(int, int, int)} and is | ||
| * responsible for writing data for an input RDD partition. | ||
| * | ||
| * One Spark task has one exclusive data writer, so there is no thread-safe concern. | ||
| * | ||
| * {@link #write(Object)} is called for each record in the input RDD partition. If one record fails | ||
| * the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will | ||
| * not be processed. If all records are successfully written, {@link #commit()} is called. | ||
| * | ||
| * If this data writer successes(all records are successfully written and {@link #commit()} | ||
| * successes), a {@link WriterCommitMessage} will be sent to the driver side and pass to | ||
| * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data | ||
| * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an | ||
| * exception will be sent to the driver side, and Spark will retry this writing task for some times, | ||
| * each time {@link DataWriterFactory#createWriter(int, int, int)} gets a different `attemptNumber`, | ||
| * and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail. | ||
| * | ||
| * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task | ||
| * takes too long to finish. Different from retried tasks, which are launched one by one after the | ||
| * previous one fails, speculative tasks are running simultaneously. It's possible that one input | ||
| * RDD partition has multiple data writers with different `attemptNumber` running at the same time, | ||
| * and data sources should guarantee that these data writers don't conflict and can work together. | ||
| * Implementations can coordinate with driver during {@link #commit()} to make sure only one of | ||
| * these data writers can commit successfully. Or implementations can allow all of them to commit | ||
| * successfully, and have a way to revert committed data writers without the commit message, because | ||
| * Spark only accepts the commit message that arrives first and ignore others. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the test case, could we implement the above logics? |
||
| * | ||
| * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data | ||
| * source writers, or {@link org.apache.spark.sql.catalyst.InternalRow} for data source writers | ||
| * that mix in {@link SupportsWriteInternalRow}. | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface DataWriter<T> { | ||
|
|
||
| /** | ||
| * Writes one record. | ||
| * | ||
| * If this method fails(throw exception), {@link #abort()} will be called and this data writer is | ||
| * considered to be failed. | ||
| */ | ||
| void write(T record); | ||
|
||
|
|
||
| /** | ||
| * Commits this writer after all records are written successfully, returns a commit message which | ||
| * will be send back to driver side and pass to | ||
| * {@link DataSourceV2Writer#commit(WriterCommitMessage[])}. | ||
| * | ||
| * The written data should only be visible to data source readers after | ||
| * {@link DataSourceV2Writer#commit(WriterCommitMessage[])} successes, which means this method | ||
| * should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to | ||
| * do the final commitment via {@link WriterCommitMessage}. | ||
| * | ||
| * If this method fails(throw exception), {@link #abort()} will be called and this data writer is | ||
| * considered to be failed. | ||
| */ | ||
| WriterCommitMessage commit(); | ||
|
|
||
| /** | ||
| * Aborts this writer if it is failed. Implementations should clean up the data for already | ||
| * written records. | ||
| * | ||
| * This method will only be called if there is one record failed to write, or {@link #commit()} | ||
| * failed. | ||
| * | ||
| * If this method fails(throw exception), the underlying data source may have garbage that need | ||
| * to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually, but | ||
| * these garbage should not be visible to data source readers. | ||
| */ | ||
| void abort(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.writer; | ||
|
|
||
| import java.io.Serializable; | ||
|
|
||
| import org.apache.spark.annotation.InterfaceStability; | ||
|
|
||
| /** | ||
| * A factory of {@link DataWriter} returned by {@link DataSourceV2Writer#createWriterFactory()}, | ||
| * which is responsible for creating and initializing the actual data writer at executor side. | ||
| * | ||
| * Note that, the writer factory will be serialized and sent to executors, then the data writer | ||
| * will be created on executors and do the actual writing. So {@link DataWriterFactory} must be | ||
| * serializable and {@link DataWriter} doesn't need to be. | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface DataWriterFactory<T> extends Serializable { | ||
|
|
||
| /** | ||
| * Returns a data writer to do the actual writing work. | ||
| * | ||
| * @param stageId The id of the Spark stage that runs the returned writer. | ||
| * @param partitionId The id of the RDD partition that the returned writer will process. | ||
| * @param attemptNumber The attempt number of the Spark task that runs the returned writer, which | ||
| * is usually 0 if the task is not a retried task or a speculative task. | ||
| */ | ||
| DataWriter<T> createWriter(int stageId, int partitionId, int attemptNumber); | ||
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.writer; | ||
|
|
||
| import org.apache.spark.annotation.Experimental; | ||
| import org.apache.spark.annotation.InterfaceStability; | ||
| import org.apache.spark.sql.Row; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
|
|
||
| /** | ||
| * A mix-in interface for {@link DataSourceV2Writer}. Data source writers can implement this | ||
| * interface to write {@link InternalRow} directly and avoid the row conversion at Spark side. | ||
| * This is an experimental and unstable interface, as {@link InternalRow} is not public and may get | ||
| * changed in the future Spark versions. | ||
| */ | ||
|
|
||
| @InterfaceStability.Evolving | ||
| @Experimental | ||
| @InterfaceStability.Unstable | ||
| public interface SupportsWriteInternalRow extends DataSourceV2Writer { | ||
|
|
||
| @Override | ||
| default DataWriterFactory<Row> createWriterFactory() { | ||
| throw new IllegalStateException( | ||
| "createWriterFactory should not be called with SupportsWriteInternalRow."); | ||
| } | ||
|
|
||
| DataWriterFactory<InternalRow> createInternalRowWriterFactory(); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.sources.v2.writer; | ||
|
|
||
| import java.io.Serializable; | ||
|
|
||
| import org.apache.spark.annotation.InterfaceStability; | ||
|
|
||
| /** | ||
| * A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side | ||
| * as the input parameter of {@link DataSourceV2Writer#commit(WriterCommitMessage[])}. | ||
| * | ||
| * This is an empty interface, data sources should define their own message class and use it in | ||
| * their {@link DataWriter#commit()} and {@link DataSourceV2Writer#commit(WriterCommitMessage[])} | ||
| * implementations. | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface WriterCommitMessage extends Serializable {} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
successes->succeeds. You might need to check all the other cases.