Skip to content

Conversation

@aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Dec 1, 2020

What changes were proposed in this pull request?

This PR provides us with a way to check if a data source is going to reject the delete via deleteWhere at planning time.

Why are the changes needed?

The only way to support delete statements right now is to implement SupportsDelete. According to its Javadoc, that interface is meant for cases when we can delete data without much effort (e.g. like deleting a complete partition in a Hive table).

This PR actually provides us with a way to check if a data source is going to reject the delete via deleteWhere at planning time instead of just getting an exception during execution. In the future, we can use this functionality to decide whether Spark should rewrite this delete and execute a distributed query or it can just pass a set of filters.

Consider an example of a partitioned Hive table. If we have a delete predicate like part_col = '2020', we can just drop the matching partition to satisfy this delete. In this case, the data source should return true from canDeleteWhere and use the filters it accepts in deleteWhere to drop the partition. I consider this as a delete without significant effort. At the same time, if we have a delete predicate like id = 10, Hive tables would not be able to execute this delete using a metadata only operation without rewriting files. In that case, the data source should return false from canDeleteWhere and we should use a more sophisticated row-level API to find out which records should be removed (the API is yet to be discussed, but we need this PR as a basis).

If we decide to support subqueries and all delete use cases by simply extending the existing API, this will mean all data sources will have to implement a lot of Spark logic to determine which records changed. I don't think we want to go that way as the Spark logic to determine which records should be deleted is independent of the underlying data source. So the assumption is that Spark will execute a plan to find which records must be deleted for data sources that return false from canDeleteWhere.

Does this PR introduce any user-facing change?

Yes but it is backward compatible.

How was this patch tested?

This PR comes with a new test.

@aokolnychyi
Copy link
Contributor Author

* Rows should be deleted from the data source iff all of the filter expressions match.
* That is, the expressions must be interpreted as a set of filters that are ANDed together.
* <p>
* Spark will call this method to check if the delete is possible without significant effort.
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there is an explanation about this, without significant effort looks like a misleading assumption here. Some Data source can implement this by themself with significant efforts due to its own reason. Can we omit this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from the documentation for deleteWhere:

Implementations may reject a delete operation if the delete isn't possible without significant effort. For example, . . .

I think that phrasing is a bit more clear because it uses "implementations may reject", so what constitutes "significant effort" is determined by the implementation.

I think a clearer way to say it here is to refer to that standard: "Spark will call this method to check whether deleteWhere would reject the delete operation because it requires significant effort."

It would also help to have more context: this is for some sources to determine whether or not a metadata delete can be performed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a clearer way to say it here is to refer to that standard: "Spark will call this method to check whether deleteWhere would reject the delete operation because it requires significant effort."

This sounds better as there is a standard between canDeleteWhere and deleteWhere.

So canDeleteWhere is a much light-weight approach to know deleteWhere will reject a delete operation without actually calling deleteWhere.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, @dongjoon-hyun @rdblue. I'll update the comment.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So canDeleteWhere is a much light-weight approach to know deleteWhere will reject a delete operation without actually calling deleteWhere.

Yes, @viirya. It actually provides us with a way to check if a data source is going to reject the delete via deleteWhere at planning time instead of just getting an exception during execution. In the future, we can use this functionality to decide whether Spark should rewrite this delete and execute a distributed query or it can just pass a set of filters.

Consider an example of a partitioned Hive table. If we have a delete predicate like part_col = '2020', we can just drop the matching partition to satisfy this delete. In this case, the data source should return true from canDeleteWhere and use the filters it accepts in deleteWhere to drop the partition. I consider this as a delete without significant effort. At the same time, if we have a delete predicate like id = 10, Hive tables would not be able to execute this delete using a metadata only operation without rewriting files. In that case, the data source should return false from canDeleteWhere and we should use a more sophisticated row-level API to find out which records should be removed (the API is yet to be discussed, but we need this PR as a basis).

If we decide to support subqueries and all delete use cases by simply extending the existing API, this will mean all data sources will have to implement a lot of Spark logic to determine which records changed. I don't think we want to go that way as the Spark logic to determine which records should be deleted is independent of the underlying data source. So the assumption is that Spark will execute a plan to find which records must be deleted for data sources that return false from canDeleteWhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is that we may change the API back and forth if we don't have a clear big picture. Right now this patch is not useful as it only changes where we throw the exception, but I can see that this will be useful when we have the row-level delete API and we can use the canDeleteWhere to decide if we want to use the row-level API or not.

This is exactly the reason for adding this API. It is a step toward rewriting plans for row-level DELETE and MERGE operations. The current deleteWhere exception approach happens while running the physical plan, when it is too late to rewrite the plan for row-level changes. Adding the canDeleteWhere check fixes that problem.

Since you can easily see how it will be used, what is the concern about adding this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is not about this PR itself, but about the order of commits. It seems more natural to have the row-level delete API first, and then do this change. At that time we can have tests to verify if we can switch correctly.

If you are working with @aokolnychyi on this feature, and you two think this is better for your development, please go ahead and merge it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we do think that it is helpful to have it in this order. This was an early problem that we ran into and the solution is clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you guys just file related JIRAs for that before merging? That's all I asked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to update this thread, JIRAs were created. Here is a summary.

* @return true if the delete operation can be performed
*/
default boolean canDeleteWhere(Filter[] filters) {
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have false as a safer default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there is a breaking change which we should have true here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this would change the assumptions for existing implementations. Right now, if this interface is implemented, Spark will call deleteWhere for the delete. Returning false would cause Spark to skip it where the return of this method is used.

The original idea was to try to delete using deleteWhere, and if that fails to run a more expensive delete. But when we started implementing the more expensive delete, we needed to know during job planning, not job execution, whether the metadata-only delete can be done. This method solves that problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, @rdblue .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct and the method returns true to keep the old behavior by default.

@rdblue
Copy link
Contributor

rdblue commented Dec 1, 2020

Looks good overall.

@github-actions github-actions bot added the SQL label Dec 1, 2020
default boolean canDeleteWhere(Filter[] filters) {
return true;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteWhere only possibly rejects a delete operation, even the delete isn't possible without significant effort.

If canDeleteWhere returns false, does it mean deleteWhere definitely reject the delete operation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would be the case. In our use case, we can tell whether a delete is aligned with partitioning for this check. But, we can also scan through data to determine whether files themselves are fully matched (or not matched) by the filter. We would do the partitioning check here and the more expensive stats-based check in deleteWhere.

}).toArray

if (!table.asDeletable.canDeleteWhere(filters)) {
throw new AnalysisException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this exception handled later? the rewrite part for row deletion is a TBD?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite would happen earlier. This just throws a good error message if deleteWhere will fail.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Dec 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite part is yet to be done. This PR just adds a way to have more info at planning time. Specifically, we will know if the rewrite is needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. So this method will be called in an earlier place and before rewrite once the rewrite part is ready, is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is going to be called at planning time to check if we should apply the rewrite or just pass filters down.

@SparkQA
Copy link

SparkQA commented Dec 2, 2020

Test build #132039 has finished for PR 30562 at commit 85ea2c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. With @rdblue 's and @aokolnychyi 's explanation, the updated patch looks clear to me now. Thank you, @aokolnychyi and @rdblue .

Could you review once more, @holdenk @dbtsai @cloud-fan @viirya @sunchao ?

@dongjoon-hyun
Copy link
Member

Also, cc @gatorsmile . Please let us know if you have some comments .

@aokolnychyi
Copy link
Contributor Author

I've updated the doc per @sunchao suggestion. Let me know if there are any other open questions.

@SparkQA
Copy link

SparkQA commented Dec 2, 2020

Test build #132070 has finished for PR 30562 at commit 8b909f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aokolnychyi ! the updated doc looks great to me. I'm LTGM (non-binding) on this PR.

@aokolnychyi
Copy link
Contributor Author

The recent test failures do not seem related to this PR.

@HyukjinKwon
Copy link
Member

Hey, please don't ignore my opinion, and file related JIRAs (#30562 (comment)).

@dongjoon-hyun
Copy link
Member

Of course, he will do, @HyukjinKwon . We are still under reviews. In addition to that, is this (#30562 (comment)) resolved?

@HyukjinKwon
Copy link
Member

Thanks @dongjoon-hyun. Yes. It's okay to edit the JIRAs later, and I understand the design doc is in progress. I just wanted to make sure we all know what to do after this.

@aokolnychyi
Copy link
Contributor Author

Filing JIRAs is something I will do for sure before this one is merged, @HyukjinKwon. I was referring to any open points related to this PR and whether we have enough consensus and everybody is ok with the change. If anybody still has unresolved concerns, let's discuss until we all agree.

@HyukjinKwon
Copy link
Member

I am okay with this PR. I have no unresolved concerns except that I prefer to know and make sure we have a plan. Easiest way should be to file relevant JIRAs.

@aokolnychyi
Copy link
Contributor Author

I've created SPARK-33642 as a parent JIRA and 3 substasks for DELETE/UPDATE/MERGE. The parent one is where the design doc should be and where the discussion should happen.

I don't see any open points on this PR now but I propose we wait a bit more for additional feedback to make sure everybody is on the same page.

@dongjoon-hyun
Copy link
Member

Thank you all! I'll merge this for Apache Spark 3.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants