From 86349110bddfabbf5fdb185aa1926c88297506f6 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 21 Jan 2019 14:59:00 -0800 Subject: [PATCH 1/9] DSv2: Support overwrite and dynamic partition overwrite. --- .../sql/catalyst/analysis/Analyzer.scala | 20 ++++ .../plans/logical/basicLogicalOperators.scala | 63 +++++++++-- .../v2/writer/SupportsDynamicOverwrite.java | 37 +++++++ .../sources/v2/writer/SupportsOverwrite.java | 36 ++++++ .../apache/spark/sql/DataFrameWriter.scala | 54 +++++---- .../v2/DataSourceV2Implicits.scala | 49 +++++++++ .../datasources/v2/DataSourceV2Relation.scala | 24 +--- .../datasources/v2/DataSourceV2Strategy.scala | 36 +++--- .../v2/WriteToDataSourceV2Exec.scala | 104 +++++++++++++++++- .../sql/sources/v2/DataSourceV2Suite.scala | 8 +- 10 files changed, 360 insertions(+), 71 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 793c337ffcb1..5707161cd36c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2254,6 +2254,26 @@ class Analyzer( } else { append } + + case overwrite @ OverwriteByExpression(table, _, query, isByName) + if table.resolved && query.resolved && !overwrite.resolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + overwrite.copy(query = projection) + } else { + overwrite + } + + case overwrite @ OverwritePartitionsDynamic(table, query, isByName) + if table.resolved && query.resolved && !overwrite.resolved => + val projection = resolveOutputColumns(table.name, table.output, query, isByName) + + if (projection != query) { + overwrite.copy(query = projection) + } else { + overwrite + } } def resolveOutputColumns( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 639d68f4ecd7..82e4b90979ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -365,14 +365,13 @@ case class Join( } /** - * Append data to an existing table. + * Base trait for DataSourceV2 write commands */ -case class AppendData( - table: NamedRelation, - query: LogicalPlan, - isByName: Boolean) extends LogicalPlan { +private[sql] trait V2WriteCommand extends Command { + def table: NamedRelation + def query: LogicalPlan + override def children: Seq[LogicalPlan] = Seq(query) - override def output: Seq[Attribute] = Seq.empty override lazy val resolved: Boolean = { table.resolved && query.resolved && query.output.size == table.output.size && @@ -386,16 +385,64 @@ case class AppendData( } } +/** + * Append data to an existing table. + */ +case class AppendData( + table: NamedRelation, + query: LogicalPlan, + isByName: Boolean) extends V2WriteCommand + object AppendData { def byName(table: NamedRelation, df: LogicalPlan): AppendData = { - new AppendData(table, df, true) + new AppendData(table, df, isByName = true) } def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = { - new AppendData(table, query, false) + new AppendData(table, query, isByName = false) + } +} + +/** + * Overwrite data matching a filter in an existing table. + */ +case class OverwriteByExpression( + table: NamedRelation, + deleteExpr: Expression, + query: LogicalPlan, + isByName: Boolean) extends V2WriteCommand + +object OverwriteByExpression { + def byName( + table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { + OverwriteByExpression(table, deleteExpr, df, isByName = true) + } + + def byPosition( + table: NamedRelation, query: LogicalPlan, deleteExpr: Expression): OverwriteByExpression = { + OverwriteByExpression(table, deleteExpr, query, isByName = false) } } +/** + * Dynamically overwrite partitions in an existing table. + */ +case class OverwritePartitionsDynamic( + table: NamedRelation, + query: LogicalPlan, + isByName: Boolean) extends V2WriteCommand + +object OverwritePartitionsDynamic { + def byName(table: NamedRelation, df: LogicalPlan): OverwritePartitionsDynamic = { + OverwritePartitionsDynamic(table, df, isByName = true) + } + + def byPosition(table: NamedRelation, query: LogicalPlan): OverwritePartitionsDynamic = { + OverwritePartitionsDynamic(table, query, isByName = false) + } +} + + /** * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the * concrete implementations during analysis. diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java new file mode 100644 index 000000000000..8058964b662b --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +/** + * Write builder trait for tables that support dynamic partition overwrite. + *

+ * A write that dynamically overwrites partitions removes all existing data in each logical + * partition for which the write will commit new data. Any existing logical partition for which the + * write does not contain data will remain unchanged. + *

+ * This is provided to implement SQL compatible with Hive table operations but is not recommended. + * Instead, use the {@link SupportsOverwrite overwrite by filter API} to explicitly replace data. + */ +public interface SupportsDynamicOverwrite extends WriteBuilder { + /** + * Configures a write to dynamically replace partitions with data committed in the write. + * + * @return this write builder for method chaining + */ + WriteBuilder overwriteDynamicPartitions(); +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java new file mode 100644 index 000000000000..d44779110aff --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.sql.sources.Filter; + +/** + * Write builder trait for tables that support overwrite by filter. + *

+ * Overwriting data by filter will delete any data that matches the filter and replace it with data + * that is committed in the write. + */ +public interface SupportsOverwrite extends WriteBuilder { + /** + * Configures a write to replace data matching the filters with data committed in the write. + * + * @param filters filters used to match data to overwrite + * @return this write builder for method chaining + */ + WriteBuilder overwrite(Filter[] filters); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e5f947337c94..a8c3ed9dc247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwritePartitionsDynamic} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} @@ -264,29 +264,39 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val dsOptions = new DataSourceOptions(options.asJava) provider.getTable(dsOptions) match { case table: SupportsBatchWrite => - if (mode == SaveMode.Append) { - val relation = DataSourceV2Relation.create(table, options) - runCommand(df.sparkSession, "save") { - AppendData.byName(relation, df.logicalPlan) - } - } else { - val writeBuilder = table.newWriteBuilder(dsOptions) - .withQueryId(UUID.randomUUID().toString) - .withInputDataSchema(df.logicalPlan.schema) - writeBuilder match { - case s: SupportsSaveMode => - val write = s.mode(mode).buildForBatch() - // It can only return null with `SupportsSaveMode`. We can clean it up after - // removing `SupportsSaveMode`. - if (write != null) { - runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(write, df.logicalPlan) + lazy val relation = DataSourceV2Relation.create(table, options) + mode match { + case SaveMode.Append => + runCommand(df.sparkSession, "save") { + AppendData.byName(relation, df.logicalPlan) + } + + case SaveMode.Overwrite => + // DataFrameWriter does not support static partition values, so the behavior of + // overwrite mode is to replace partitions dynamically. + runCommand(df.sparkSession, "save") { + OverwritePartitionsDynamic.byName(relation, df.logicalPlan) + } + + case _ => + table.newWriteBuilder(dsOptions) match { + case writeBuilder: SupportsSaveMode => + val write = writeBuilder.mode(mode) + .withQueryId(UUID.randomUUID().toString) + .withInputDataSchema(df.logicalPlan.schema) + .buildForBatch() + // It can only return null with `SupportsSaveMode`. We can clean it up after + // removing `SupportsSaveMode`. + if (write != null) { + runCommand(df.sparkSession, "save") { + WriteToDataSourceV2(write, df.logicalPlan) + } } - } - case _ => throw new AnalysisException( - s"data source ${table.name} does not support SaveMode $mode") - } + case _ => + throw new AnalysisException( + s"data source ${table.name} does not support SaveMode $mode") + } } // Streaming also uses the data source V2 API. So it may be that the data source implements diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala new file mode 100644 index 000000000000..c8542bfe5e59 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table} + +object DataSourceV2Implicits { + implicit class TableHelper(table: Table) { + def asBatchReadable: SupportsBatchRead = { + table match { + case support: SupportsBatchRead => + support + case _ => + throw new AnalysisException(s"Table does not support batch reads: ${table.name}") + } + } + + def asBatchWritable: SupportsBatchWrite = { + table match { + case support: SupportsBatchWrite => + support + case _ => + throw new AnalysisException(s"Table does not support batch writes: ${table.name}") + } + } + } + + implicit class OptionsHelper(options: Map[String, String]) { + def toDataSourceOptions: DataSourceOptions = new DataSourceOptions(options.asJava) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 47cf26dc9481..53677782c95f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -17,11 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2 -import java.util.UUID - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} @@ -30,7 +25,6 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.types.StructType /** * A logical plan representing a data source v2 table. @@ -45,26 +39,16 @@ case class DataSourceV2Relation( options: Map[String, String]) extends LeafNode with MultiInstanceRelation with NamedRelation { + import DataSourceV2Implicits._ + override def name: String = table.name() override def simpleString(maxFields: Int): String = { s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } - def newScanBuilder(): ScanBuilder = table match { - case s: SupportsBatchRead => - val dsOptions = new DataSourceOptions(options.asJava) - s.newScanBuilder(dsOptions) - case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}") - } - - def newWriteBuilder(schema: StructType): WriteBuilder = table match { - case s: SupportsBatchWrite => - val dsOptions = new DataSourceOptions(options.asJava) - s.newWriteBuilder(dsOptions) - .withQueryId(UUID.randomUUID().toString) - .withInputDataSchema(schema) - case _ => throw new AnalysisException(s"Table is not writable: ${table.name()}") + def newScanBuilder(): ScanBuilder = { + table.asBatchReadable.newScanBuilder(options.toDataSourceOptions) } override def computeStats(): Statistics = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index d6d17d6df7b1..ec94b41f113e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,18 +19,19 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression} +import org.apache.spark.SparkException +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode -object DataSourceV2Strategy extends Strategy { +object DataSourceV2Strategy extends Strategy with PredicateHelper { /** * Pushes down filters to the data source reader @@ -100,6 +101,7 @@ object DataSourceV2Strategy extends Strategy { } } + import DataSourceV2Implicits._ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => @@ -146,14 +148,22 @@ object DataSourceV2Strategy extends Strategy { WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - val writeBuilder = r.newWriteBuilder(query.schema) - writeBuilder match { - case s: SupportsSaveMode => - val write = s.mode(SaveMode.Append).buildForBatch() - assert(write != null) - WriteToDataSourceV2Exec(write, planLater(query)) :: Nil - case _ => throw new AnalysisException(s"data source ${r.name} does not support SaveMode") - } + AppendDataExec( + r.table.asBatchWritable, r.options.toDataSourceOptions, planLater(query)) :: Nil + + case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) => + // fail if any filter cannot be converted. correctness depends on removing all matching data. + val filters = splitConjunctivePredicates(deleteExpr).map { + filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( + throw new SparkException(s"Cannot translate expression to source filter: $filter")) + }.toArray + + OverwriteByExpressionExec( + r.table.asBatchWritable, filters, r.options.toDataSourceOptions, planLater(query)) :: Nil + + case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => + OverwritePartitionsDynamicExec(r.table.asBatchWritable, + r.options.toDataSourceOptions, planLater(query)) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 50c5e4f2ad7d..19772af08a60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -17,17 +17,22 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + import scala.util.control.NonFatal import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.executor.CommitDeniedException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, WriteBuilder, WriterCommitMessage} import org.apache.spark.util.{LongAccumulator, Utils} /** @@ -41,18 +46,107 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) override def output: Seq[Attribute] = Nil } +case class AppendDataExec( + table: SupportsBatchWrite, + writeOptions: DataSourceOptions, + query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + + override protected def doExecute(): RDD[InternalRow] = { + val batchWrite = newWriteBuilder() match { + case builder: SupportsSaveMode => // TODO: Remove this + builder.mode(SaveMode.Append).buildForBatch() + + case builder => + builder.buildForBatch() + } + doWrite(batchWrite) + } +} + +case class OverwriteByExpressionExec( + table: SupportsBatchWrite, + filters: Array[Filter], + writeOptions: DataSourceOptions, + query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + + override protected def doExecute(): RDD[InternalRow] = { + val batchWrite = newWriteBuilder() match { + case builder: SupportsOverwrite => + builder.overwrite(filters).buildForBatch() + + case builder: SupportsSaveMode => // TODO: Remove this + builder.mode(SaveMode.Overwrite).buildForBatch() + + case _ => + throw new SparkException(s"Table does not support dynamic partition overwrite: $table") + } + + doWrite(batchWrite) + } +} + +case class OverwritePartitionsDynamicExec( + table: SupportsBatchWrite, + writeOptions: DataSourceOptions, + query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + + override protected def doExecute(): RDD[InternalRow] = { + val batchWrite = newWriteBuilder() match { + case builder: SupportsDynamicOverwrite => + builder.overwriteDynamicPartitions().buildForBatch() + + case builder: SupportsSaveMode => // TODO: Remove this + builder.mode(SaveMode.Overwrite).buildForBatch() + + case _ => + throw new SparkException(s"Table does not support dynamic partition overwrite: $table") + } + + doWrite(batchWrite) + } +} + +case class WriteToDataSourceV2Exec( + batchWrite: BatchWrite, + query: SparkPlan + ) extends V2TableWriteExec { + + import DataSourceV2Implicits._ + + def writeOptions: DataSourceOptions = Map.empty[String, String].toDataSourceOptions + + override protected def doExecute(): RDD[InternalRow] = { + doWrite(batchWrite) + } +} + +/** + * Helper for physical plans that build batch writes. + */ +private[sql] trait BatchWriteHelper { + def table: SupportsBatchWrite + def query: SparkPlan + def writeOptions: DataSourceOptions + + def newWriteBuilder(): WriteBuilder = { + table.newWriteBuilder(writeOptions) + .withInputDataSchema(query.schema) + .withQueryId(UUID.randomUUID().toString) + } +} + /** - * The physical plan for writing data into data source v2. + * The base physical plan for writing data into data source v2. */ -case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan) - extends UnaryExecNode { +private[sql] trait V2TableWriteExec extends UnaryExecNode { + def query: SparkPlan var commitProgress: Option[StreamWriterCommitProgress] = None override def child: SparkPlan = query override def output: Seq[Attribute] = Nil - override protected def doExecute(): RDD[InternalRow] = { + protected def doWrite(batchWrite: BatchWrite): RDD[InternalRow] = { val writerFactory = batchWrite.createBatchWriterFactory() val useCommitCoordinator = batchWrite.useCommitCoordinator val rdd = query.execute() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index 511fdfe5c23a..6b5c45e40ab0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -351,19 +351,21 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { } } - test("SPARK-25700: do not read schema when writing in other modes except append mode") { + test("SPARK-25700: do not read schema when writing in other modes except append and overwrite") { withTempPath { file => val cls = classOf[SimpleWriteOnlyDataSource] val path = file.getCanonicalPath val df = spark.range(5).select('id as 'i, -'id as 'j) // non-append mode should not throw exception, as they don't access schema. df.write.format(cls.getName).option("path", path).mode("error").save() - df.write.format(cls.getName).option("path", path).mode("overwrite").save() df.write.format(cls.getName).option("path", path).mode("ignore").save() - // append mode will access schema and should throw exception. + // append and overwrite modes will access the schema and should throw exception. intercept[SchemaReadAttemptException] { df.write.format(cls.getName).option("path", path).mode("append").save() } + intercept[SchemaReadAttemptException] { + df.write.format(cls.getName).option("path", path).mode("overwrite").save() + } } } } From aca75ae63832d37c4167ea54b94af0ca59d9508a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 27 Jan 2019 13:01:25 -0800 Subject: [PATCH 2/9] Update V2 analysis suite to cover overwrite plans. --- .../sql/catalyst/analysis/Analyzer.scala | 8 +- .../plans/logical/basicLogicalOperators.scala | 8 +- .../analysis/DataSourceV2AnalysisSuite.scala | 191 +++++++++++++----- 3 files changed, 146 insertions(+), 61 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5707161cd36c..83c9d1e9d964 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -159,6 +159,7 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: + ResolveOutputRelation :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: @@ -178,7 +179,6 @@ class Analyzer( ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: - ResolveOutputRelation :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: @@ -2246,7 +2246,7 @@ class Analyzer( object ResolveOutputRelation extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case append @ AppendData(table, query, isByName) - if table.resolved && query.resolved && !append.resolved => + if table.resolved && query.resolved && !append.writeResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { @@ -2256,7 +2256,7 @@ class Analyzer( } case overwrite @ OverwriteByExpression(table, _, query, isByName) - if table.resolved && query.resolved && !overwrite.resolved => + if table.resolved && query.resolved && !overwrite.writeResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { @@ -2266,7 +2266,7 @@ class Analyzer( } case overwrite @ OverwritePartitionsDynamic(table, query, isByName) - if table.resolved && query.resolved && !overwrite.resolved => + if table.resolved && query.resolved && !overwrite.writeResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 82e4b90979ec..e9e94a0dc2ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -373,7 +373,9 @@ private[sql] trait V2WriteCommand extends Command { override def children: Seq[LogicalPlan] = Seq(query) - override lazy val resolved: Boolean = { + override lazy val resolved: Boolean = writeResolved + + def writeResolved: Boolean = { table.resolved && query.resolved && query.output.size == table.output.size && query.output.zip(table.output).forall { case (inAttr, outAttr) => @@ -410,7 +412,9 @@ case class OverwriteByExpression( table: NamedRelation, deleteExpr: Expression, query: LogicalPlan, - isByName: Boolean) extends V2WriteCommand + isByName: Boolean) extends V2WriteCommand { + override lazy val resolved: Boolean = writeResolved && deleteExpr.resolved +} object OverwriteByExpression { def byName( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 6c899b610ac5..6d0a215b54c4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -19,15 +19,92 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, UpCast} -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Project} import org.apache.spark.sql.types.{DoubleType, FloatType, StructField, StructType} +class V2AppendDataAnalysisSuite extends DataSourceV2AnalysisSuite { + override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = { + AppendData.byName(table, query) + } + + override def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan = { + AppendData.byPosition(table, query) + } +} + +class V2OverwritePartitionsDynamicAnalysisSuite extends DataSourceV2AnalysisSuite { + override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = { + OverwritePartitionsDynamic.byName(table, query) + } + + override def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan = { + OverwritePartitionsDynamic.byPosition(table, query) + } +} + +class V2OverwriteByExpressionAnalysisSuite extends DataSourceV2AnalysisSuite { + override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = { + OverwriteByExpression.byName(table, query, Literal(true)) + } + + override def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan = { + OverwriteByExpression.byPosition(table, query, Literal(true)) + } + + test("delete expression is resolved using table fields") { + val table = TestRelation(StructType(Seq( + StructField("x", DoubleType, nullable = false), + StructField("y", DoubleType))).toAttributes) + + val query = TestRelation(StructType(Seq( + StructField("a", DoubleType, nullable = false), + StructField("b", DoubleType))).toAttributes) + + val a = query.output.head + val b = query.output.last + val x = table.output.head + + val parsedPlan = OverwriteByExpression.byPosition(table, query, + LessThanOrEqual(UnresolvedAttribute(Seq("x")), Literal(15.0d))) + + val expectedPlan = OverwriteByExpression.byPosition(table, + Project(Seq( + Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(), + Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()), + query), + LessThanOrEqual( + AttributeReference("x", DoubleType, nullable = false)(x.exprId), + Literal(15.0d))) + + assertNotResolved(parsedPlan) + checkAnalysis(parsedPlan, expectedPlan) + assertResolved(expectedPlan) + } + + test("delete expression cannot be resolved using query fields") { + val xRequiredTable = TestRelation(StructType(Seq( + StructField("x", DoubleType, nullable = false), + StructField("y", DoubleType))).toAttributes) + + val query = TestRelation(StructType(Seq( + StructField("a", DoubleType, nullable = false), + StructField("b", DoubleType))).toAttributes) + + // the write is resolved (checked above). this test plan is not because of the expression. + val parsedPlan = OverwriteByExpression.byPosition(xRequiredTable, query, + LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d))) + + assertNotResolved(parsedPlan) + assertAnalysisError(parsedPlan, Seq("cannot resolve", "`a`", "given input columns", "x, y")) + } +} + case class TestRelation(output: Seq[AttributeReference]) extends LeafNode with NamedRelation { override def name: String = "table-name" } -class DataSourceV2AnalysisSuite extends AnalysisTest { +abstract class DataSourceV2AnalysisSuite extends AnalysisTest { val table = TestRelation(StructType(Seq( StructField("x", FloatType), StructField("y", FloatType))).toAttributes) @@ -40,21 +117,25 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("x", DoubleType), StructField("y", DoubleType))).toAttributes) - test("Append.byName: basic behavior") { + def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan + + def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan + + test("byName: basic behavior") { val query = TestRelation(table.schema.toAttributes) - val parsedPlan = AppendData.byName(table, query) + val parsedPlan = byName(table, query) checkAnalysis(parsedPlan, parsedPlan) assertResolved(parsedPlan) } - test("Append.byName: does not match by position") { + test("byName: does not match by position") { val query = TestRelation(StructType(Seq( StructField("a", FloatType), StructField("b", FloatType))).toAttributes) - val parsedPlan = AppendData.byName(table, query) + val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -62,12 +143,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot find data for output column", "'x'", "'y'")) } - test("Append.byName: case sensitive column resolution") { + test("byName: case sensitive column resolution") { val query = TestRelation(StructType(Seq( StructField("X", FloatType), // doesn't match case! StructField("y", FloatType))).toAttributes) - val parsedPlan = AppendData.byName(table, query) + val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -76,7 +157,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { caseSensitive = true) } - test("Append.byName: case insensitive column resolution") { + test("byName: case insensitive column resolution") { val query = TestRelation(StructType(Seq( StructField("X", FloatType), // doesn't match case! StructField("y", FloatType))).toAttributes) @@ -84,8 +165,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { val X = query.output.head val y = query.output.last - val parsedPlan = AppendData.byName(table, query) - val expectedPlan = AppendData.byName(table, + val parsedPlan = byName(table, query) + val expectedPlan = byName(table, Project(Seq( Alias(Cast(toLower(X), FloatType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()), @@ -96,7 +177,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { assertResolved(expectedPlan) } - test("Append.byName: data columns are reordered by name") { + test("byName: data columns are reordered by name") { // out of order val query = TestRelation(StructType(Seq( StructField("y", FloatType), @@ -105,8 +186,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { val y = query.output.head val x = query.output.last - val parsedPlan = AppendData.byName(table, query) - val expectedPlan = AppendData.byName(table, + val parsedPlan = byName(table, query) + val expectedPlan = byName(table, Project(Seq( Alias(Cast(x, FloatType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()), @@ -117,26 +198,26 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { assertResolved(expectedPlan) } - test("Append.byName: fail nullable data written to required columns") { - val parsedPlan = AppendData.byName(requiredTable, table) + test("byName: fail nullable data written to required columns") { + val parsedPlan = byName(requiredTable, table) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", "Cannot write nullable values to non-null column", "'x'", "'y'")) } - test("Append.byName: allow required data written to nullable columns") { - val parsedPlan = AppendData.byName(table, requiredTable) + test("byName: allow required data written to nullable columns") { + val parsedPlan = byName(table, requiredTable) assertResolved(parsedPlan) checkAnalysis(parsedPlan, parsedPlan) } - test("Append.byName: missing required columns cause failure and are identified by name") { + test("byName: missing required columns cause failure and are identified by name") { // missing required field x val query = TestRelation(StructType(Seq( StructField("y", FloatType, nullable = false))).toAttributes) - val parsedPlan = AppendData.byName(requiredTable, query) + val parsedPlan = byName(requiredTable, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -144,12 +225,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot find data for output column", "'x'")) } - test("Append.byName: missing optional columns cause failure and are identified by name") { + test("byName: missing optional columns cause failure and are identified by name") { // missing optional field x val query = TestRelation(StructType(Seq( StructField("y", FloatType))).toAttributes) - val parsedPlan = AppendData.byName(table, query) + val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -157,8 +238,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot find data for output column", "'x'")) } - test("Append.byName: fail canWrite check") { - val parsedPlan = AppendData.byName(table, widerTable) + test("byName: fail canWrite check") { + val parsedPlan = byName(table, widerTable) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -166,12 +247,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) } - test("Append.byName: insert safe cast") { + test("byName: insert safe cast") { val x = table.output.head val y = table.output.last - val parsedPlan = AppendData.byName(widerTable, table) - val expectedPlan = AppendData.byName(widerTable, + val parsedPlan = byName(widerTable, table) + val expectedPlan = byName(widerTable, Project(Seq( Alias(Cast(x, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(y, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()), @@ -182,13 +263,13 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { assertResolved(expectedPlan) } - test("Append.byName: fail extra data fields") { + test("byName: fail extra data fields") { val query = TestRelation(StructType(Seq( StructField("x", FloatType), StructField("y", FloatType), StructField("z", FloatType))).toAttributes) - val parsedPlan = AppendData.byName(table, query) + val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -197,7 +278,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Data columns: 'x', 'y', 'z'")) } - test("Append.byName: multiple field errors are reported") { + test("byName: multiple field errors are reported") { val xRequiredTable = TestRelation(StructType(Seq( StructField("x", FloatType, nullable = false), StructField("y", DoubleType))).toAttributes) @@ -206,7 +287,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("x", DoubleType), StructField("b", FloatType))).toAttributes) - val parsedPlan = AppendData.byName(xRequiredTable, query) + val parsedPlan = byName(xRequiredTable, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -216,7 +297,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot find data for output column", "'y'")) } - test("Append.byPosition: basic behavior") { + test("byPosition: basic behavior") { val query = TestRelation(StructType(Seq( StructField("a", FloatType), StructField("b", FloatType))).toAttributes) @@ -224,8 +305,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { val a = query.output.head val b = query.output.last - val parsedPlan = AppendData.byPosition(table, query) - val expectedPlan = AppendData.byPosition(table, + val parsedPlan = byPosition(table, query) + val expectedPlan = byPosition(table, Project(Seq( Alias(Cast(a, FloatType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(b, FloatType, Some(conf.sessionLocalTimeZone)), "y")()), @@ -236,7 +317,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { assertResolved(expectedPlan) } - test("Append.byPosition: data columns are not reordered") { + test("byPosition: data columns are not reordered") { // out of order val query = TestRelation(StructType(Seq( StructField("y", FloatType), @@ -245,8 +326,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { val y = query.output.head val x = query.output.last - val parsedPlan = AppendData.byPosition(table, query) - val expectedPlan = AppendData.byPosition(table, + val parsedPlan = byPosition(table, query) + val expectedPlan = byPosition(table, Project(Seq( Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(x, FloatType, Some(conf.sessionLocalTimeZone)), "y")()), @@ -257,26 +338,26 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { assertResolved(expectedPlan) } - test("Append.byPosition: fail nullable data written to required columns") { - val parsedPlan = AppendData.byPosition(requiredTable, table) + test("byPosition: fail nullable data written to required columns") { + val parsedPlan = byPosition(requiredTable, table) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", "Cannot write nullable values to non-null column", "'x'", "'y'")) } - test("Append.byPosition: allow required data written to nullable columns") { - val parsedPlan = AppendData.byPosition(table, requiredTable) + test("byPosition: allow required data written to nullable columns") { + val parsedPlan = byPosition(table, requiredTable) assertResolved(parsedPlan) checkAnalysis(parsedPlan, parsedPlan) } - test("Append.byPosition: missing required columns cause failure") { + test("byPosition: missing required columns cause failure") { // missing optional field x val query = TestRelation(StructType(Seq( StructField("y", FloatType, nullable = false))).toAttributes) - val parsedPlan = AppendData.byPosition(requiredTable, query) + val parsedPlan = byPosition(requiredTable, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -285,12 +366,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Data columns: 'y'")) } - test("Append.byPosition: missing optional columns cause failure") { + test("byPosition: missing optional columns cause failure") { // missing optional field x val query = TestRelation(StructType(Seq( StructField("y", FloatType))).toAttributes) - val parsedPlan = AppendData.byPosition(table, query) + val parsedPlan = byPosition(table, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -299,12 +380,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Data columns: 'y'")) } - test("Append.byPosition: fail canWrite check") { + test("byPosition: fail canWrite check") { val widerTable = TestRelation(StructType(Seq( StructField("a", DoubleType), StructField("b", DoubleType))).toAttributes) - val parsedPlan = AppendData.byPosition(table, widerTable) + val parsedPlan = byPosition(table, widerTable) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -312,7 +393,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) } - test("Append.byPosition: insert safe cast") { + test("byPosition: insert safe cast") { val widerTable = TestRelation(StructType(Seq( StructField("a", DoubleType), StructField("b", DoubleType))).toAttributes) @@ -320,8 +401,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { val x = table.output.head val y = table.output.last - val parsedPlan = AppendData.byPosition(widerTable, table) - val expectedPlan = AppendData.byPosition(widerTable, + val parsedPlan = byPosition(widerTable, table) + val expectedPlan = byPosition(widerTable, Project(Seq( Alias(Cast(x, DoubleType, Some(conf.sessionLocalTimeZone)), "a")(), Alias(Cast(y, DoubleType, Some(conf.sessionLocalTimeZone)), "b")()), @@ -332,13 +413,13 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { assertResolved(expectedPlan) } - test("Append.byPosition: fail extra data fields") { + test("byPosition: fail extra data fields") { val query = TestRelation(StructType(Seq( StructField("a", FloatType), StructField("b", FloatType), StructField("c", FloatType))).toAttributes) - val parsedPlan = AppendData.byName(table, query) + val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( @@ -347,7 +428,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { "Data columns: 'a', 'b', 'c'")) } - test("Append.byPosition: multiple field errors are reported") { + test("byPosition: multiple field errors are reported") { val xRequiredTable = TestRelation(StructType(Seq( StructField("x", FloatType, nullable = false), StructField("y", DoubleType))).toAttributes) @@ -356,7 +437,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { StructField("x", DoubleType), StructField("b", FloatType))).toAttributes) - val parsedPlan = AppendData.byPosition(xRequiredTable, query) + val parsedPlan = byPosition(xRequiredTable, query) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( From 8993f19023c2ba1a4e5d7ba46497e0c6f39ab49d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 27 Jan 2019 15:00:31 -0800 Subject: [PATCH 3/9] Add SupportsTruncate for sources that do not support overwrite. --- .../sources/v2/writer/SupportsOverwrite.java | 8 ++++- .../sources/v2/writer/SupportsTruncate.java | 32 +++++++++++++++++++ .../apache/spark/sql/DataFrameWriter.scala | 8 ++--- .../datasources/DataSourceStrategy.scala | 6 ++++ .../v2/WriteToDataSourceV2Exec.scala | 11 +++++-- .../apache/spark/sql/sources/filters.scala | 20 ++++++++++++ 6 files changed, 78 insertions(+), 7 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java index d44779110aff..ea1893e0f4dc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.sources.v2.writer; +import org.apache.spark.sql.sources.AlwaysTrue$; import org.apache.spark.sql.sources.Filter; /** @@ -25,7 +26,7 @@ * Overwriting data by filter will delete any data that matches the filter and replace it with data * that is committed in the write. */ -public interface SupportsOverwrite extends WriteBuilder { +public interface SupportsOverwrite extends WriteBuilder, SupportsTruncate { /** * Configures a write to replace data matching the filters with data committed in the write. * @@ -33,4 +34,9 @@ public interface SupportsOverwrite extends WriteBuilder { * @return this write builder for method chaining */ WriteBuilder overwrite(Filter[] filters); + + @Override + default WriteBuilder truncate() { + return overwrite(new Filter[] { AlwaysTrue$.MODULE$ }); + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java new file mode 100644 index 000000000000..69c2ba5e01a4 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +/** + * Write builder trait for tables that support truncation. + *

+ * Truncation removes all data in a table and replaces it with data that is committed in the write. + */ +public interface SupportsTruncate extends WriteBuilder { + /** + * Configures a write to replace all existing data with data committed in the write. + * + * @return this write builder for method chaining + */ + WriteBuilder truncate(); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a8c3ed9dc247..450828172b93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,8 @@ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} @@ -272,10 +273,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } case SaveMode.Overwrite => - // DataFrameWriter does not support static partition values, so the behavior of - // overwrite mode is to replace partitions dynamically. + // truncate the table runCommand(df.sparkSession, "save") { - OverwritePartitionsDynamic.byName(relation, df.logicalPlan) + OverwriteByExpression.byName(relation, df.logicalPlan, Literal(true)) } case _ => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 273cc3b19302..b73dc30d6f23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -529,6 +529,12 @@ object DataSourceStrategy { case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => Some(sources.StringContains(a.name, v.toString)) + case expressions.Literal(true, BooleanType) => + Some(sources.AlwaysTrue) + + case expressions.Literal(false, BooleanType) => + Some(sources.AlwaysFalse) + case _ => None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 19772af08a60..65b7703707ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{AlwaysTrue, Filter} import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite} -import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.util.{LongAccumulator, Utils} /** @@ -69,8 +69,15 @@ case class OverwriteByExpressionExec( writeOptions: DataSourceOptions, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { + private def isTruncate(filters: Array[Filter]): Boolean = { + filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue] + } + override protected def doExecute(): RDD[InternalRow] = { val batchWrite = newWriteBuilder() match { + case builder: SupportsTruncate if isTruncate(filters) => + builder.truncate().buildForBatch() + case builder: SupportsOverwrite => builder.overwrite(filters).buildForBatch() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 3f941cc6e107..d347863b728c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -218,3 +218,23 @@ case class StringEndsWith(attribute: String, value: String) extends Filter { case class StringContains(attribute: String, value: String) extends Filter { override def references: Array[String] = Array(attribute) } + +/** + * A filter that always evaluates to `true`. + */ +case class AlwaysTrue() extends Filter { + override def references: Array[String] = Array.empty +} + +object AlwaysTrue extends AlwaysTrue { +} + +/** + * A filter that always evaluates to `false`. + */ +case class AlwaysFalse() extends Filter { + override def references: Array[String] = Array.empty +} + +object AlwaysFalse extends AlwaysFalse { +} From cc5fdac8b0d96ee725c878fd8e568dca4e230fea Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 27 Jan 2019 15:22:23 -0800 Subject: [PATCH 4/9] Fix visibility caught by reviews. --- .../sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 +- .../execution/datasources/v2/WriteToDataSourceV2Exec.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index e9e94a0dc2ea..b19949d690e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -367,7 +367,7 @@ case class Join( /** * Base trait for DataSourceV2 write commands */ -private[sql] trait V2WriteCommand extends Command { +trait V2WriteCommand extends Command { def table: NamedRelation def query: LogicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 65b7703707ac..eae4760accc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -130,7 +130,7 @@ case class WriteToDataSourceV2Exec( /** * Helper for physical plans that build batch writes. */ -private[sql] trait BatchWriteHelper { +trait BatchWriteHelper { def table: SupportsBatchWrite def query: SparkPlan def writeOptions: DataSourceOptions @@ -145,7 +145,7 @@ private[sql] trait BatchWriteHelper { /** * The base physical plan for writing data into data source v2. */ -private[sql] trait V2TableWriteExec extends UnaryExecNode { +trait V2TableWriteExec extends UnaryExecNode { def query: SparkPlan var commitProgress: Option[StreamWriterCommitProgress] = None From d6927542f118a96c13393d9d8e98619c2987d30e Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Feb 2019 12:42:06 -0800 Subject: [PATCH 5/9] Ignore unresolved writes in ResolveReferences. This ensures that the query and table columns for a write are resolved before expressions are resolved so that expressions are resolved using field names from the table, not the query. This also reverts moving ResolveOutputRelation because rule order no longer matters. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 7 ++++++- .../sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 83c9d1e9d964..51de574b908e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -159,7 +159,6 @@ class Analyzer( Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: - ResolveOutputRelation :: ResolveReferences :: ResolveCreateNamedStruct :: ResolveDeserializer :: @@ -179,6 +178,7 @@ class Analyzer( ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: + ResolveOutputRelation :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: @@ -978,6 +978,11 @@ class Analyzer( case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) + case o: OverwriteByExpression if !o.writeResolved => + // do not resolve expression attributes until the query attributes are resolved against the + // table by ResolveOutputRelation. that rule will alias the attributes to the table's names. + o + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 6d0a215b54c4..0c4854861426 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -82,7 +82,7 @@ class V2OverwriteByExpressionAnalysisSuite extends DataSourceV2AnalysisSuite { assertResolved(expectedPlan) } - test("delete expression cannot be resolved using query fields") { + test("delete expression is not resolved using query fields") { val xRequiredTable = TestRelation(StructType(Seq( StructField("x", DoubleType, nullable = false), StructField("y", DoubleType))).toAttributes) From 84d01ab1aa4c2bcf6ce4bd44122cb4b203919d31 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 1 Feb 2019 12:56:50 -0800 Subject: [PATCH 6/9] Remove TODO comments that aren't helpful. --- .../execution/datasources/v2/WriteToDataSourceV2Exec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index eae4760accc9..808e88dc1c91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -53,7 +53,7 @@ case class AppendDataExec( override protected def doExecute(): RDD[InternalRow] = { val batchWrite = newWriteBuilder() match { - case builder: SupportsSaveMode => // TODO: Remove this + case builder: SupportsSaveMode => builder.mode(SaveMode.Append).buildForBatch() case builder => @@ -81,7 +81,7 @@ case class OverwriteByExpressionExec( case builder: SupportsOverwrite => builder.overwrite(filters).buildForBatch() - case builder: SupportsSaveMode => // TODO: Remove this + case builder: SupportsSaveMode => builder.mode(SaveMode.Overwrite).buildForBatch() case _ => @@ -102,7 +102,7 @@ case class OverwritePartitionsDynamicExec( case builder: SupportsDynamicOverwrite => builder.overwriteDynamicPartitions().buildForBatch() - case builder: SupportsSaveMode => // TODO: Remove this + case builder: SupportsSaveMode => builder.mode(SaveMode.Overwrite).buildForBatch() case _ => From c47575eda765e9d9b7fca99436992332993af857 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Tue, 5 Feb 2019 17:03:59 -0800 Subject: [PATCH 7/9] SPARK-26666: Use ORC v1 by default. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d285e007dac1..0b7b67ed56d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1452,7 +1452,7 @@ object SQLConf { " register class names for which data source V2 write paths are disabled. Writes from these" + " sources will fall back to the V1 sources.") .stringConf - .createWithDefault("") + .createWithDefault("orc") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .doc("A comma-separated list of fully qualified data source register class names for which" + From d67ad46f2090bb0d291c38f31f4bad521a62636c Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 13 Feb 2019 15:58:15 -0800 Subject: [PATCH 8/9] SPARK-26666: Update for review comments. --- .../sql/catalyst/analysis/Analyzer.scala | 8 ++--- .../plans/logical/basicLogicalOperators.scala | 6 ++-- .../v2/reader/SupportsPushDownFilters.java | 3 ++ .../sources/v2/writer/SupportsOverwrite.java | 3 ++ .../v2/WriteToDataSourceV2Exec.scala | 36 +++++++++++++++---- .../apache/spark/sql/sources/filters.scala | 6 +++- 6 files changed, 48 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 51de574b908e..42904c5c04c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -978,7 +978,7 @@ class Analyzer( case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) - case o: OverwriteByExpression if !o.writeResolved => + case o: OverwriteByExpression if !o.outputResolved => // do not resolve expression attributes until the query attributes are resolved against the // table by ResolveOutputRelation. that rule will alias the attributes to the table's names. o @@ -2251,7 +2251,7 @@ class Analyzer( object ResolveOutputRelation extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case append @ AppendData(table, query, isByName) - if table.resolved && query.resolved && !append.writeResolved => + if table.resolved && query.resolved && !append.outputResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { @@ -2261,7 +2261,7 @@ class Analyzer( } case overwrite @ OverwriteByExpression(table, _, query, isByName) - if table.resolved && query.resolved && !overwrite.writeResolved => + if table.resolved && query.resolved && !overwrite.outputResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { @@ -2271,7 +2271,7 @@ class Analyzer( } case overwrite @ OverwritePartitionsDynamic(table, query, isByName) - if table.resolved && query.resolved && !overwrite.writeResolved => + if table.resolved && query.resolved && !overwrite.outputResolved => val projection = resolveOutputColumns(table.name, table.output, query, isByName) if (projection != query) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b19949d690e2..f7f701cea51f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -373,9 +373,9 @@ trait V2WriteCommand extends Command { override def children: Seq[LogicalPlan] = Seq(query) - override lazy val resolved: Boolean = writeResolved + override lazy val resolved: Boolean = outputResolved - def writeResolved: Boolean = { + def outputResolved: Boolean = { table.resolved && query.resolved && query.output.size == table.output.size && query.output.zip(table.output).forall { case (inAttr, outAttr) => @@ -413,7 +413,7 @@ case class OverwriteByExpression( deleteExpr: Expression, query: LogicalPlan, isByName: Boolean) extends V2WriteCommand { - override lazy val resolved: Boolean = writeResolved && deleteExpr.resolved + override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved } object OverwriteByExpression { diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java index 296d3e47e732..f10fd884daab 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java @@ -29,6 +29,9 @@ public interface SupportsPushDownFilters extends ScanBuilder { /** * Pushes down filters, and returns filters that need to be evaluated after scanning. + *

+ * Rows should be returned from the data source if and only if all of the filters match. That is, + * filters must be interpreted as ANDed together. */ Filter[] pushFilters(Filter[] filters); diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java index ea1893e0f4dc..b443b3c3aeb4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java @@ -29,6 +29,9 @@ public interface SupportsOverwrite extends WriteBuilder, SupportsTruncate { /** * Configures a write to replace data matching the filters with data committed in the write. + *

+ * Rows must be deleted from the data source if and only if all of the filters match. That is, + * filters must be interpreted as ANDed together. * * @param filters filters used to match data to overwrite * @return this write builder for method chaining diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 808e88dc1c91..d7cb2457433b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -46,6 +46,11 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) override def output: Seq[Attribute] = Nil } +/** + * Physical plan node for append into a v2 table. + * + * Rows in the output data set are appended. + */ case class AppendDataExec( table: SupportsBatchWrite, writeOptions: DataSourceOptions, @@ -63,9 +68,19 @@ case class AppendDataExec( } } +/** + * Physical plan node for overwrite into a v2 table. + * + * Overwrites data in a table matched by a set of filters. Rows matching all of the filters will be + * deleted and rows in the output data set are appended. + * + * This plan is used to implement SaveMode.Overwrite. The behavior of SaveMode.Overwrite is to + * truncate the table -- delete all rows -- and append the output data set. This uses the filter + * AlwaysTrue to delete all rows. + */ case class OverwriteByExpressionExec( table: SupportsBatchWrite, - filters: Array[Filter], + deleteWhere: Array[Filter], writeOptions: DataSourceOptions, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { @@ -75,15 +90,15 @@ case class OverwriteByExpressionExec( override protected def doExecute(): RDD[InternalRow] = { val batchWrite = newWriteBuilder() match { - case builder: SupportsTruncate if isTruncate(filters) => + case builder: SupportsTruncate if isTruncate(deleteWhere) => builder.truncate().buildForBatch() - case builder: SupportsOverwrite => - builder.overwrite(filters).buildForBatch() - - case builder: SupportsSaveMode => + case builder: SupportsSaveMode if isTruncate(deleteWhere) => builder.mode(SaveMode.Overwrite).buildForBatch() + case builder: SupportsOverwrite => + builder.overwrite(deleteWhere).buildForBatch() + case _ => throw new SparkException(s"Table does not support dynamic partition overwrite: $table") } @@ -92,6 +107,15 @@ case class OverwriteByExpressionExec( } } +/** + * Physical plan node for dynamic partition overwrite into a v2 table. + * + * Dynamic partition overwrite is the behavior of Hive INSERT OVERWRITE ... PARTITION queries, and + * Spark INSERT OVERWRITE queries when spark.sql.sources.partitionOverwriteMode=dynamic. Each + * partition in the output data set replaces the corresponding existing partition in the table or + * creates a new partition. Existing partitions for which there is no data in the output data set + * are not modified. + */ case class OverwritePartitionsDynamicExec( table: SupportsBatchWrite, writeOptions: DataSourceOptions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index d347863b728c..a1ab55a7185c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import org.apache.spark.annotation.Stable +import org.apache.spark.annotation.{Evolving, Stable} //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines all the filters that we can push down to the data sources. @@ -222,19 +222,23 @@ case class StringContains(attribute: String, value: String) extends Filter { /** * A filter that always evaluates to `true`. */ +@Evolving case class AlwaysTrue() extends Filter { override def references: Array[String] = Array.empty } +@Evolving object AlwaysTrue extends AlwaysTrue { } /** * A filter that always evaluates to `false`. */ +@Evolving case class AlwaysFalse() extends Filter { override def references: Array[String] = Array.empty } +@Evolving object AlwaysFalse extends AlwaysFalse { } From 0e42cc28b173f55c645c863732fb13d1343d4d75 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 15 Feb 2019 09:26:35 -0800 Subject: [PATCH 9/9] SPARK-26666: Use AnalysisException in DataSourceV2Strategy. --- .../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index ec94b41f113e..55d7b0a18cbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.SparkException -import org.apache.spark.sql.Strategy +import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition} @@ -155,7 +154,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { // fail if any filter cannot be converted. correctness depends on removing all matching data. val filters = splitConjunctivePredicates(deleteExpr).map { filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse( - throw new SparkException(s"Cannot translate expression to source filter: $filter")) + throw new AnalysisException(s"Cannot translate expression to source filter: $filter")) }.toArray OverwriteByExpressionExec(