Skip to content

Conversation

@xloya
Copy link
Contributor

@xloya xloya commented Mar 30, 2023

We should always close the delete threads pool when the delete procedure having done. We have encountered the problem that the number of threads increases sharply after the resident Driver executes a large number of delete orphan procedures.

@github-actions github-actions bot added the spark label Mar 30, 2023
@xloya
Copy link
Contributor Author

xloya commented Mar 30, 2023

Another improve method is to close it only in the CALL Procedure, because a new thread pool is created every time in the Procedure like this:

  protected ExecutorService executorService(int threadPoolSize, String nameFormat) {
    Preconditions.checkArgument(
        executorService == null, "Cannot create a new executor service, one already exists.");
    Preconditions.checkArgument(
        nameFormat != null, "Cannot create a service with null nameFormat arg");
    this.executorService =
        MoreExecutors.getExitingExecutorService(
            (ThreadPoolExecutor)
                Executors.newFixedThreadPool(
                    threadPoolSize,
                    new ThreadFactoryBuilder()
                        .setDaemon(true)
                        .setNameFormat(nameFormat + "-%d")
                        .build()));

    return executorService;
  }

@hililiwei
Copy link
Contributor

Nice catch.

Copy link
Contributor

@sririshindra sririshindra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should not shutdown the executor service within the SparkActions. The reason being that the ExecutorService is passed by the user. The user may reuse that same ExecutorService in other places of their code. For Instance, a user might create an ExecutorService object, pass it to the DeleteOrphanFilesSparkAction, once that is finished, pass the same exact ExecutorService object to another ExpireSnapshotsSparkAction. But if you shutdown the Executor service that the user passed within the DeleteOrphanFilesSparkAction itself, then the subsequent ExpireSnapshotsSparkAction will fail. We should give the user the flexibility to reuse their ExecutorService Object.

Like you alluded to in the PR, you should instead shutdown the executor service with in the spark procedure's code. There a new executor service is being created every time a procedure is being called.

But I am still confused about one thing. Why are you seeing a sharp spike in number of threads when the driver executes a large number of DeleteOrphanFilesAction? How are you calling the action? Are you using the RemoveOrphanFiles Procedure or are you using the Java API to call the DeleteOrphanFilesSparkAction?

I would imagine that if you are using the procedure, then the ExecutorService object created within the procedure's code will get automatically garbage collected, thereby shutting down the thread pool within that object. Probably you are seeing the spike between garbage collections. That is a valid problem and shutting down the Executor service within the procedure will address it. On the other hand, if you are using the JAVA API, it is the responsibility of the user to ensure that their Executor Services are shutdown after the SparkAction is complete.

One other thing to check. Are you passing a custom deleteFunc and/or is your underlying file system doesn't support BulkOperations? If you did not pass a deleteFunc and if your underlying fileSystem supports BulkOperations, then the ExecutorService is never created within the procedure. Instead the delete operations are handled by the underlying IO api. In case of S3FileIo(see

private ExecutorService executorService() {
) and HadoopFileIo(see
private ExecutorService executorService() {
) both create their own Executor Services, which should be shut down in that code rather than than the procedure's code.

@xloya
Copy link
Contributor Author

xloya commented Mar 31, 2023

I think you should not shutdown the executor service within the SparkActions. The reason being that the ExecutorService is passed by the user. The user may reuse that same ExecutorService in other places of their code. For Instance, a user might create an ExecutorService object, pass it to the DeleteOrphanFilesSparkAction, once that is finished, pass the same exact ExecutorService object to another ExpireSnapshotsSparkAction. But if you shutdown the Executor service that the user passed within the DeleteOrphanFilesSparkAction itself, then the subsequent ExpireSnapshotsSparkAction will fail. We should give the user the flexibility to reuse their ExecutorService Object.

Like you alluded to in the PR, you should instead shutdown the executor service with in the spark procedure's code. There a new executor service is being created every time a procedure is being called.

But I am still confused about one thing. Why are you seeing a sharp spike in number of threads when the driver executes a large number of DeleteOrphanFilesAction? How are you calling the action? Are you using the RemoveOrphanFiles Procedure or are you using the Java API to call the DeleteOrphanFilesSparkAction?

I would imagine that if you are using the procedure, then the ExecutorService object created within the procedure's code will get automatically garbage collected, thereby shutting down the thread pool within that object. Probably you are seeing the spike between garbage collections. That is a valid problem and shutting down the Executor service within the procedure will address it. On the other hand, if you are using the JAVA API, it is the responsibility of the user to ensure that their Executor Services are shutdown after the SparkAction is complete.

One other thing to check. Are you passing a custom deleteFunc and/or is your underlying file system doesn't support BulkOperations? If you did not pass a deleteFunc and if your underlying fileSystem supports BulkOperations, then the ExecutorService is never created within the procedure. Instead the delete operations are handled by the underlying IO api. In case of S3FileIo(see

private ExecutorService executorService() {

) and HadoopFileIo(see

private ExecutorService executorService() {

) both create their own Executor Services, which should be shut down in that code rather than than the procedure's code.

Yes, I also think it's a bit inappropriate to close it directly in Action, I'll close the threads pool later in Procedure. In addition, we did see spikes in the thread after a large number of Procedure calls in a resident Driver. For us internally, there was a limit to thread spikes, which resulted in a scheduled Kill. I think it's also a programming specification for programs to turn off thread pools after using it in code. In addition, we haven't incorporated the PR of BulkOperations you mentioned into the internal, so we haven't considered this modification, but I can add the change later.

@xloya xloya changed the title Spark: Close the delete threads pool in some actions like DeleteOrphan or ExpireSnapshots Spark: Close the delete threads pool in some procedures like DeleteOrphan and ExpireSnapshots Mar 31, 2023
@xloya
Copy link
Contributor Author

xloya commented Mar 31, 2023

@sririshindra @RussellSpitzer FYI, thanks.

}
}

if (executorService() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line before and after this this code block

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, Shutting down the executor service here is not appropriate because a client might create a single S3fileIO object and call multiple methods on it. For instance one can call the deleteFiles method multiple times on the same S3FileIo object. So, if we shut down the executor service then the second deleteFiles method will fail.

You can actually add the following line to the end of this snippet executorService = null . That way, if anyone calls the deleteFiles method method multiple times on the same S3FileIo object a new ExecutorService will be recreated. But that would nullify the need for executorService method. I think the point of having that method is to reuse the executorService.

I think it might actually be better to remove this snippet entirely in this case and let the garbage collector take care of shutting down the executor service. But that would probably not solve the issue that you are facing.

You can maybe add a method like resetExecutorService and call it in your fork. Or you can maybe make executorService a static method, so that the same thread pool is reused every time and you wouldn't see too many threads created at the same time. But that comes with its own problem. The awsProperties object which is needed to create the executorService object is dependent on the S3FileIo object not the class. So, that would be a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I think it is better to add a method in FileIO that can pass in a custom executorService to execute the Delete method. We can also close the threads pool of Procedure uniformly in the upper layer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable to me. Let's see if others have any different opinion about it.

@github-actions github-actions bot removed the AWS label Mar 31, 2023

private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);

private ExecutorService executorService = null;
Copy link
Contributor

@aokolnychyi aokolnychyi Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already executorService variable in BaseProcedure and modifyIcebergTable() must already close that executor service if it has been initialized. Could you check why that logic does not work for you?

Also, keep in mind that FileIO implementations have their own thread pools after PR #6682.

Copy link
Contributor Author

@xloya xloya Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I noticed. I will close this PR.

Copy link
Contributor Author

@xloya xloya Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still using Spark 3.1 now. And I see the Remove Orphan Procedure and Expire Snapshots Procedure in Spark 3.1 don't have the code to close the threads pool. if necessary, I can open a new PR to backport this logic to Spark 3.1.

@xloya xloya closed this Apr 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants