Skip to content

Conversation

@RussellSpitzer
Copy link
Member

Changes all Deleting Spark Actions to use FileIO Bulk Operations, adds Bulk delete to HadoopIO

The basic idea here is all of our deletes should use the bulk api or have their parallelism controlled at the FileIO level primarily. All deletes should use some parallelism by default.

…ailable

Previously deletes were handled by a per Action execution service that would be used
to parallelize single deletes. In this PR we move the responsibility of performing the
deletes and the parallelization of those deletes to the FileIO via SupportsBulkOperations.

This deprecates all methods which used to be used for doing single deletes as well as
passing executor services to Actions which delete many files.
@amogh-jahagirdar
Copy link
Contributor

Thanks a ton for closing the loop on this @RussellSpitzer ! Left some comments

@aokolnychyi
Copy link
Contributor

I am getting to this today, hopefully.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to see this merged, thank you @RussellSpitzer

@aokolnychyi
Copy link
Contributor

I agree with the overall direction but I'd try to support the existing API to avoid massive deprecation and simplify the implementation. It will be hard to test all possible scenarios.

.onFailure(
(f, e) -> {
LOG.error("Failure during bulk delete on file: {} ", f, e);
failureCount.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to increment the count on each failed attempt and won't be accurate. We could count the number of successfully deleted files instead and then use Iterables.size(pathsToDelete) to find how many we were supposed to delete.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought it was once per element

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to go over the iterable more than once ... let me think about this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked this, it only fires off when all retries are exhausted so it is correct as is.

scala> def testFailure() = { 
  var failureCount =0 
  Tasks.foreach("value")
            .retry(3)
            .onFailure((y, x: Throwable) => failureCount += 1)
            .suppressFailureWhenFinished()
            .run(x => throw new Exception("ohNO"))
   failureCount
 }
 
 scala> testFailure()
23/03/01 10:16:22 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...	
23/03/01 10:16:23 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
23/03/01 10:16:25 WARN Tasks: Retrying task after failure: ohNO
java.lang.Exception: ohNO
...
res21: Int = 1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runTaskWithRetry(task, item);
succeeded.add(item);
} catch (Exception e) {
exceptions.add(e);
if (onFailure != null) {
tryRunOnFailure(item, e);
Code in question (RunWithRetry) does all retries before hitting "onFailure"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we overlooked it while reviewing another PR. I like it more. I'll update SparkCleanupUtil to follow this patter as well.

.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
.run(deleteFunc::accept);
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new pattern

if (bulk)
 Bulk
 else {
    if no custom delete
        table.io:: delete
    If custom delete
          custom Delete
 }

This logic is repeated in all of the actions

if (deleteFunc == null && io instanceof SupportsBulkOperations) {
summary = deleteFiles((SupportsBulkOperations) io, files);
} else {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually meant an empty line after DeleteSummary var but formatting here is up to you.
I like the new pattern.


if (deleteFunc == null) {
LOG.info(
"Table IO {} does not support bulk operations. Using non-bulk deletes.", table.io());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few non-blocking nits. Looks great otherwise. Thanks, @RussellSpitzer! Feel free to merge whenever you are ready.

@RussellSpitzer
Copy link
Member Author

Thanks @amogh-jahagirdar , @dramaticlly and @aokolnychyi I'll merge when tests pass. I'll do the Backport Pr's after my subsurface talk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants