Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,30 @@
*/
@Evolving
public interface SupportsDelete {

/**
* Checks whether it is possible to delete data from a data source table that matches filter
* expressions.
* <p>
* Rows should be deleted from the data source iff all of the filter expressions match.
* That is, the expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Spark will call this method at planning time to check whether {@link #deleteWhere(Filter[])}
* would reject the delete operation because it requires significant effort. If this method
* returns false, Spark will not call {@link #deleteWhere(Filter[])} and will try to rewrite
* the delete operation and produce row-level changes if the data source table supports deleting
* individual records.
*
* @param filters filter expressions, used to select rows to delete when all expressions match
* @return true if the delete operation can be performed
*/
default boolean canDeleteWhere(Filter[] filters) {
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have false as a safer default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there is a breaking change which we should have true here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this would change the assumptions for existing implementations. Right now, if this interface is implemented, Spark will call deleteWhere for the delete. Returning false would cause Spark to skip it where the return of this method is used.

The original idea was to try to delete using deleteWhere, and if that fails to run a more expensive delete. But when we started implementing the more expensive delete, we needed to know during job planning, not job execution, whether the metadata-only delete can be done. This method solves that problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, @rdblue .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct and the method returns true to keep the old behavior by default.

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteWhere only possibly rejects a delete operation, even the delete isn't possible without significant effort.

If canDeleteWhere returns false, does it mean deleteWhere definitely reject the delete operation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be the case. In our use case, we can tell whether a delete is aligned with partitioning for this check. But, we can also scan through data to determine whether files themselves are fully matched (or not matched) by the filter. We would do the partitioning check here and the more expensive stats-based check in deleteWhere.

/**
* Delete data from a data source table that matches filter expressions.
* Delete data from a data source table that matches filter expressions. Note that this method
* will be invoked only if {@link #canDeleteWhere(Filter[])} returns true.
* <p>
* Rows are deleted from the data source iff all of the filter expressions match. That is, the
* expressions must be interpreted as a set of filters that are ANDed together.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class InMemoryTable(
}
}

override def canDeleteWhere(filters: Array[Filter]): Boolean = {
InMemoryTable.supportsFilters(filters)
}

override def deleteWhere(filters: Array[Filter]): Unit = dataMap.synchronized {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
dataMap --= InMemoryTable.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted), filters)
Expand All @@ -360,6 +364,14 @@ object InMemoryTable {
}
}

def supportsFilters(filters: Array[Filter]): Boolean = {
filters.flatMap(splitAnd).forall {
case _: EqualTo => true
case _: IsNotNull => true
case _ => false
}
}

private def extractValue(
attr: String,
partFieldNames: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
throw new AnalysisException(s"Exec update failed:" +
s" cannot translate expression to source filter: $f"))
}).toArray

if (!table.asDeletable.canDeleteWhere(filters)) {
throw new AnalysisException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this exception handled later? the rewrite part for row deletion is a TBD?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite would happen earlier. This just throws a good error message if deleteWhere will fail.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite part is yet to be done. This PR just adds a way to have more info at planning time. Specifically, we will know if the rewrite is needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. So this method will be called in an earlier place and before rewrite once the rewrite part is ready, is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is going to be called at planning time to check if we should apply the rewrite or just pass filters down.

s"Cannot delete from table ${table.name} where ${filters.mkString("[", ", ", "]")}")
}

DeleteFromTableExec(table.asDeletable, filters) :: Nil
case _ =>
throw new AnalysisException("DELETE is only supported with v2 tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,20 @@ class DataSourceV2SQLSuite
}
}

test("DeleteFrom: delete with unsupported predicates") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo")
sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
val exc = intercept[AnalysisException] {
sql(s"DELETE FROM $t WHERE id > 3 AND p > 3")
}

assert(spark.table(t).count === 3)
assert(exc.getMessage.contains(s"Cannot delete from table $t"))
}
}

test("DeleteFrom: DELETE is only supported with v2 tables") {
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
Expand Down