diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index aa44e677e577..5c23373af0a0 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -214,6 +214,7 @@ statement | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration + | DELETE FROM multipartIdentifier tableAlias whereClause #deleteFromTable | unsupportedHiveNativeCommands .*? #failNativeCommand ; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java new file mode 100644 index 000000000000..8650a0ef1d4b --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/sources/v2/SupportsDelete.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Table} delete support. Data sources can implement this + * interface to provide the ability to delete data from tables that matches filter expressions. + */ +public interface SupportsDelete { + /** + * Delete data from a data source table that matches filter expressions. + *

+ * Rows are deleted from the data source iff all of the filter expressions match. That is, the + * expressions must be interpreted as a set of filters that are ANDed together. + *

+ * Implementations may reject a delete operation if the delete isn't possible without significant + * effort. For example, partitioned data sources may reject deletes that do not filter by + * partition columns because the filter may require rewriting files without deleted records. + * To reject a delete implementations should throw {@link IllegalArgumentException} with a clear + * error message that identifies which expression was rejected. + * + * @param filters filter expressions, used to select rows to delete when all expressions match + * @throws IllegalArgumentException If the delete is rejected due to required effort + */ + void deleteWhere(Filter[] filters); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f8eef0cf3236..f88a8bcff521 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1761,6 +1761,8 @@ class Analyzer( // Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries. case q: UnaryNode if q.childrenResolved => resolveSubQueries(q, q.children) + case d: DeleteFromTable if d.childrenResolved => + resolveSubQueries(d, d.children) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 519c558d1277..bd54c66992da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -585,7 +585,7 @@ trait CheckAnalysis extends PredicateHelper { // Only certain operators are allowed to host subquery expression containing // outer references. plan match { - case _: Filter | _: Aggregate | _: Project => // Ok + case _: Filter | _: Aggregate | _: Project | _: DeleteFromTable => // Ok case other => failAnalysis( "Correlated scalar sub-queries can only be used in a " + s"Filter/Aggregate/Project: $plan") @@ -594,9 +594,10 @@ trait CheckAnalysis extends PredicateHelper { case inSubqueryOrExistsSubquery => plan match { - case _: Filter => // Ok + case _: Filter | _: DeleteFromTable => // Ok case _ => - failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in a Filter: $plan") + failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" + + s" Filter/DeleteFromTable: $plan") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 49ca09d9ef07..179345225d90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -338,6 +338,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException("INSERT OVERWRITE DIRECTORY is not supported", ctx) } + override def visitDeleteFromTable( + ctx: DeleteFromTableContext): LogicalPlan = withOrigin(ctx) { + + val tableId = visitMultipartIdentifier(ctx.multipartIdentifier) + val tableAlias = if (ctx.tableAlias() != null) { + val ident = ctx.tableAlias().strictIdentifier() + if (ident != null) { Some(ident.getText) } else { None } + } else { + None + } + + DeleteFromStatement(tableId, tableAlias, expression(ctx.whereClause().booleanExpression())) + } + /** * Create a partition specification map. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d9c370af47fb..968a561da9c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -567,6 +567,13 @@ case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Comm override val output = DescribeTableSchema.describeTableAttributes() } +case class DeleteFromTable( + child: LogicalPlan, + condition: Expression) extends Command { + + override def children: Seq[LogicalPlan] = child :: Nil +} + /** * Drop a table. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala new file mode 100644 index 000000000000..21e24127eee3 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/DeleteFromStatement.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class DeleteFromStatement( + tableName: Seq[String], + tableAlias: Option[String], + condition: Expression) + extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index eed69cdc8cac..2d59c42ee868 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.sources.v2.{SupportsDelete, SupportsRead, SupportsWrite, Table, TableCapability} object DataSourceV2Implicits { implicit class TableHelper(table: Table) { @@ -40,6 +40,15 @@ object DataSourceV2Implicits { } } + def asDeletable: SupportsDelete = { + table match { + case support: SupportsDelete => + support + case _ => + throw new AnalysisException(s"Table does not support deletes: ${table.name}") + } + } + def supports(capability: TableCapability): Boolean = table.capabilities.contains(capability) def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 4d42f5fb7336..1cc5dd8ce1d5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -531,7 +531,8 @@ class AnalysisErrorSuite extends AnalysisTest { val plan = Project( Seq(a, Alias(InSubquery(Seq(a), ListQuery(LocalRelation(b))), "c")()), LocalRelation(a)) - assertAnalysisError(plan, "Predicate sub-queries can only be used in a Filter" :: Nil) + assertAnalysisError(plan, "Predicate sub-queries can only be used" + + " in Filter/DeleteFromTable" :: Nil) } test("PredicateSubQuery is used is a nested condition") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a150a049f33e..4791fe5fb525 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -25,10 +25,10 @@ import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand} import org.apache.spark.sql.execution.datasources.v2.{CatalogTableAsV2, DataSourceV2Relation} @@ -173,9 +173,18 @@ case class DataSourceResolution( // only top-level adds are supported using AlterTableAddColumnsCommand AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) + case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => + throw new AnalysisException( + s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + + s" API. Identifier: $table.") + + case delete: DeleteFromStatement => + val relation = UnresolvedRelation(delete.tableName) + val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) + DeleteFromTable(aliased, delete.condition) + case DataSourceV2Relation(CatalogTableAsV2(catalogTable), _, _) => UnresolvedCatalogRelation(catalogTable) - } object V1WriteProvider { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 91fc2e068af7..585fe06ce4ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalog.v2.StagingTableCatalog import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -222,6 +222,15 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => OverwritePartitionsDynamicExec(r.table.asWritable, r.options, planLater(query)) :: Nil + case DeleteFromTable(r: DataSourceV2Relation, condition) => + // fail if any filter cannot be converted. correctness depends on removing all matching data. + val filters = splitConjunctivePredicates(condition).map { + f => DataSourceStrategy.translateFilter(f).getOrElse( + throw new AnalysisException(s"Exec delete failed:" + + s" cannot translate expression to source filter: $f")) + }.toArray + DeleteFromTableExec(r.table.asDeletable, filters) :: Nil + case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala new file mode 100644 index 000000000000..a5840571fff2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.SupportsDelete +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +case class DeleteFromTableExec( + table: SupportsDelete, + condition: Array[Filter]) extends LeafExecNode { + + override protected def doExecute(): RDD[InternalRow] = { + table.deleteWhere(condition) + sparkContext.emptyRDD + } + + override def output: Seq[Attribute] = Nil +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala index cf77998c122f..5648d5439ba5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2WriteSupportCheck.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} +import org.apache.spark.sql.catalyst.expressions.{Literal, SubqueryExpression} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteFromTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.BooleanType @@ -51,6 +51,11 @@ object V2WriteSupportCheck extends (LogicalPlan => Unit) { } } + case DeleteFromTable(_, condition) => + if (SubqueryExpression.hasSubquery(condition)) { + failAnalysis(s"Delete by condition with subquery is not supported: $condition") + } + case _ => // OK } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index 9ae51d577b56..9b1a23a1f2bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -1862,6 +1862,42 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn } } + test("DeleteFrom: basic") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t WHERE id = 2") + checkAnswer(spark.table(t), Seq( + Row(3, "c", 3))) + } + } + + test("DeleteFrom: alias") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + sql(s"DELETE FROM $t tbl WHERE tbl.id = 2") + checkAnswer(spark.table(t), Seq( + Row(3, "c", 3))) + } + } + + test("DeleteFrom: fail if has subquery") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)") + sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)") + val exc = intercept[AnalysisException] { + sql(s"DELETE FROM $t WHERE id IN (SELECT id FROM $t)") + } + + assert(spark.table(t).count === 3) + assert(exc.getMessage.contains("Delete by condition with subquery is not supported")) + } + } + private def testCreateAnalysisError(sqlStatement: String, expectedError: String): Unit = { val errMsg = intercept[AnalysisException] { sql(sqlStatement) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala index 7c51a29bde90..109dc198b083 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalog.v2.expressions.{IdentityTransform, Transform import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.sources.{And, EqualTo, Filter} +import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull} import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReader, PartitionReaderFactory, Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriter, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.types.StructType @@ -117,7 +117,7 @@ class InMemoryTable( val schema: StructType, override val partitioning: Array[Transform], override val properties: util.Map[String, String]) - extends Table with SupportsRead with SupportsWrite { + extends Table with SupportsRead with SupportsWrite with SupportsDelete { partitioning.foreach { t => if (!t.isInstanceOf[IdentityTransform]) { @@ -220,35 +220,50 @@ class InMemoryTable( private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - val deleteKeys = dataMap.keys.filter { partValues => + dataMap --= deletesKeys(filters) + withData(messages.map(_.asInstanceOf[BufferedRows])) + } + } + + private object TruncateAndAppend extends TestBatchWrite { + override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { + dataMap.clear + withData(messages.map(_.asInstanceOf[BufferedRows])) + } + } + + override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized { + dataMap --= deletesKeys(filters) + } + + private def splitAnd(filter: Filter): Seq[Filter] = { + filter match { + case And(left, right) => splitAnd(left) ++ splitAnd(right) + case _ => filter :: Nil + } + } + + private def deletesKeys(filters: Array[Filter]): Iterable[Seq[Any]] = { + dataMap.synchronized { + dataMap.keys.filter { partValues => filters.flatMap(splitAnd).forall { case EqualTo(attr, value) => - partFieldNames.zipWithIndex.find(_._1 == attr) match { - case Some((_, partIndex)) => - value == partValues(partIndex) - case _ => - throw new IllegalArgumentException(s"Unknown filter attribute: $attr") - } + value == extractValue(attr, partValues) + case IsNotNull(attr) => + null != extractValue(attr, partValues) case f => throw new IllegalArgumentException(s"Unsupported filter type: $f") } } - dataMap --= deleteKeys - withData(messages.map(_.asInstanceOf[BufferedRows])) - } - - private def splitAnd(filter: Filter): Seq[Filter] = { - filter match { - case And(left, right) => splitAnd(left) ++ splitAnd(right) - case _ => filter :: Nil - } } } - private object TruncateAndAppend extends TestBatchWrite { - override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { - dataMap.clear - withData(messages.map(_.asInstanceOf[BufferedRows])) + private def extractValue(attr: String, partValues: Seq[Any]): Any = { + partFieldNames.zipWithIndex.find(_._1 == attr) match { + case Some((_, partIndex)) => + partValues(partIndex) + case _ => + throw new IllegalArgumentException(s"Unknown filter attribute: $attr") } } }