-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Parallelize initializing readTasks when localityPreferred is true #2800
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .stopOnFailure() | ||
| .executeWith(readTasksInitExecutorService) | ||
| .run(task -> { | ||
| readTasks.add(new ReadTask<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can't alter readTasks from multiple threads because array lists aren't thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that was a mistake. Fixed it!
|
Oops, I have also fix this a few days ago, and create another PR #2803 just a moment ago. |
| task, tableBroadcast, expectedSchemaString, caseSensitive, | ||
| localityPreferred, InternalRowReaderFactory.INSTANCE); | ||
| synchronized (readTasks) { | ||
| readTasks.add(readTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it‘s better to use array instead of list, and then we can avoid to add lock or temporary var ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, using array can help avoid lock, but the lock time should be insignificant compared to the get block location operation. The return value is a list and I didn't want to do all the transformation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap! But another reason is we can keep the style consistent with spark3 module, a little bit of optimization with array as follow :R153, how do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay!
| InputPartition<ColumnarBatch> readTask = new ReadTask<>( | ||
| task, tableBroadcast, expectedSchemaString, caseSensitive, | ||
| localityPreferred, new BatchReaderFactory(batchSize)); | ||
| synchronized (readTasks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a concurrent list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first time I wanted to use a concurrent list. But this would change the return value from a normal list to a concurrent list, and the subsequent operation on this list should only be read. Maybe it will affect performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code to use arrays like in spark3 to avoid synchronization.
7000ecc to
1313fa9
Compare
| Tasks.range(readTasks.length) | ||
| .stopOnFailure() | ||
| .executeWith(readTasksInitExecutorService) | ||
| .run(index -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The block is unnecessary because there is only one expression, can you remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| private Filter[] pushedFilters = NO_FILTERS; | ||
| private final boolean localityPreferred; | ||
| private final int batchSize; | ||
| private ExecutorService readTasksInitExecutorService = DEFAULT_READTASKS_INIT_EXECUTOR_SERVICE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that there is a need for this field and I'd prefer not to add mutable state. Can you refactor this to call executeWith(ThreadPools.getWorkerPool()) instead? You can pass null to that method so it can also check localityPreferred inline:
.executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this comment because it really simplify the code path without introducing any static or local variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That really simplifies the code. Thanks @rdblue
|
Thanks for @jshmchenxi for the optimize work for spark on HDFS ( I just notice this and #2577, it's impressive). Do we also need to parallelize the SparkMicroBatchStream#planInputPartitions in this PR ? |
|
@openinx Thanks for reminding, I've added parallelization to SparkMicroBatchStream#planInputPartitions in this PR. |
|
Thanks, @jshmchenxi! I merged this. |
Follows #2577, use thread pool to initialize readTasks if Spark locality is preferred.
Before this, the Spark plan phase could be slow as it uses single thread to obtain block locations of all files for this scan.
More information could be found in this comment