-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Add Multi-thread to construct ReadTask #2803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @StefanXiepj |
|
@southernriver Hi, you can refer to the comments in #2577 if you are interested, like using |
Get it, I'm new to iceberg, thank you . |
|
Thank you for your submission @southernriver! This seems like a somewhat large behavior change. Can you open a GitHub issue to track this / for general discussion? Or possibly this has been discussed on the mailing list (in which case possibly you can open an issue and link to the dev list discussion)? Also, an example query / situation you encounter this in could be added there and that would help greatly. I might be thinking of something else, but I thought that at a certain point, planning is distributed once a certain heuristic is passed. One thing I'd like to see discussed is the abilitu to opt-into this behavior. I'd be more comfortaenwitj this change if we could (at least initially) allow users to opt into or out of using the thread pool to do the planning. I imagine there are scenarios in which it's a detriment. An issue (and then either the dev list or that issue) would be a much better place to discuss that in my opinion. I’m not necessarily adverse to this change, but I do think having an issue to track a relatively large change in functionality (single threaded to multithreaded) would be nice as this is a pretty large change in behavior with no way to really opt out (or any documentation on the new table property). Also, if you could more completely describe what the query is that you're encountering these times and what your dataset looks like (ideally in the issue), that would be great. It's possible that with so many files, OPTIMIZE and other table maintenance tasks need to be fun. |
| /** | ||
| * This is called in the Spark Driver when data is to be materialized into {@link ColumnarBatch} | ||
| */ | ||
| @SuppressWarnings({"checkstyle:LocalVariableName", "checkstyle:RegexpSinglelineJava"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What variable names are causing these checkstyle issues? Would it be possible to simply change them, instead of suppressing the warning for the whole function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Long startTime = System.currentTimeMillis();" would cause "checkstyle:LocalVariableName".
And I will solve another checkstyle of "RegexpSinglelineJava"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. In that case, possibly changing startTime to another name to not shadow the external would be potentially preferable.
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
| InputPartition<ColumnarBatch>[] readTasks = new InputPartition[taskSize]; | ||
| Long startTime = System.currentTimeMillis(); | ||
| try { | ||
| pool.submit(() -> IntStream.range(0, taskSize).parallel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the construction of all read tasks be done in a single submit to the thread pool? I don’t see any way to slow the parallelism down here so as to not potentially overwhelm the name node.
For example, I would have expected that each of the ranges in the int stream were submitted to the pool invidually, so that tasks queue up waiting for their turn. Here, it looks like the parallelism is rather unbounded. Totally open to reading this wrong (it is Sunday for me after all!).
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
| }).collect(Collectors.toList())).get(); | ||
| } catch (Exception e) { | ||
| e.printStackTrace(); | ||
| System.exit(-1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would avoid explicitly calling System.exit and instead let the exception bubble up (possibly catching it and then rethrowing it with additional information added or as another exception type). This would make it easier for end users to track down their exceptions, particularly when working in a notebook where there's limited space to display stack traces already.
Is there a specific reason you chose to call System.exit that possibly I'm not aware of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reviewing this! I wanted to throw exception out at first, and if I did this, I also need to change code of
org.apache.spark.sql.connector.read.Batch and so on which is outside of iceberg. Here is the err msg:
planInputPartitions()' in 'org.apache.iceberg.spark.source.SparkBatchScan' clashes with 'planInputPartitions()' in 'org.apache.spark.sql.connector.read.Batch'; overridden method does not throw 'java.lang.Exception'
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
|
@southernriver I was mistaken about the existence of distributed job planning (and hence my initial concern about how necessary this PR was). I thought that there was distributed job planning already, but the PR is still open: #1421 I do know that this is being reprioritized again, but I'm not sure of any official timeline on that. Wanted to let you know. 🙂 |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
For Spark2/Spark3, It always takes about 30 minutes to enter the Job Submitted state for over 100000 files, the more files, the longer waiting time for driver to scan. This piece of current code takes into account the locality strategy of the data, and will call getBlockLocations sequentially in a single thread.
Here is the thread log :
We can use multithreading to solve this problem.
Manual Test
Before this:
After this:
21/07/09 18:41:12 INFO Reader: It took 164 s to construct 184082 readTasks with localityPreferred = true.