-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-5327] Fix spark stages when using row writer #7374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator | ||
| }).collect() | ||
| table.getContext.parallelize(writeStatuses.toList.asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this could be a reason for performance problems. Can you please elaborate what you're trying to achieve here?
cc @boneanxs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, collect is used internally in bulk insert for [[Dataset] when execute clusting, which cause
- A single spark job is generated within it, and if there are many clusting groups, too many spark jobs will be generated, which makes the spark app not simple enough
- Because Executor is not explicitly specified when submiting spark Jobs through
CompletableFuture. supplyAsync, the number of spark jobs that can be executed simultaneously is limited to the number of CPU cores of the driver, which may cause a performance bottleneck
In addition, performClusteringWithRecordsRDD does not have the above problems, because it does not use collect internally, so I just keep their behavior consistent
You can see https://issues.apache.org/jira/browse/HUDI-5327, I introduced the case I encountered in it
cc @boneanxs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Zouxxyy, Thanks for raising this issue! It's so nice to see you're trying this feature!
The reason to collect the data here is that HoodieData<WriteStatus> will be used multiple times after performClustering, I recall there is an isEmpty check could take lots of time(validateWriteResult), so here we directly convert to a list of WriteStatus, which will reduce the time.
For the second issue, I noticed this and raised a pr to fix it: #7343, will that address your problem? Feel free to review it!
I think performClusteringWithRecordsRDD also has the same issue such as using RDDSpatialCurveSortPartitioner to optimize data layout, it will call RDD.isEmpty, which will raise a new job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can fix this by directly using getStat, but what if updateIndex will calculate writeStatusList multiple times? If we can directly dereference RDD<WriteStatus> to a list of WriteStatus at one feasible point(such as performClusteringWithRecordsAsRow has already done), we no need to take care of such issue anymore.
As for the parallelism of thread pool could cause the performance issue, I think performClusteringWithRecordsRDD also has the same issue. As we might call partitioner.repartitionRecords, there could also raise a new job inside the Future thread such as https://github.com/apache/hudi/blob/ea48a85efcf8e331d0cc105d426e830b8bfe5b37/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java#L66(check if the RDD is empty or not), or sortBy function in RDDCustomColumnsSortPartitioner(sortBy use RangePartitoner which needs to sample the rdd first to decide the ranges, which will also raise a job in the Future)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For, #7343, you should be right, I overlooked that other operations may also generate a job. However, I'm wondering if it's necessary to specifically set a parameter
Very appreciate it if you can review the pr to share your thought, could you please explain more in that pr? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zouxxyy in this case we should actually not be relying on persist as a way to avoid double execution, since persisting is essentially just a caching mechanism (re-using cached blocks on executors) and it'd not be relied upon (it could fail at any point if, for ex, one of the executors fail, making you recompute whole RDD)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexeykudinkin, ok, WriteStatus has a large class attributeas writtenRecords, as long as collect does not cause OOM
Change Logs
fix https://issues.apache.org/jira/browse/HUDI-5327
Currently, collect is used internally in bulk insert for [[Dataset] when execute clusting, which cause
supplyAsync, the number of spark jobs that can be executed simultaneously is limited to the number of CPU cores of the driver, which may cause a performance bottleneckSo, just remove collect in
bulk insert for [[Dataset<Row>]]Impact
Make spark app simper, avoid possible performance bottlenecks when enable
hoodie.datasource.write.row.writer.enableIn addition, performClusteringWithRecordsRDD does not have the above problems, because it does not use collect internally, so I just keep their behavior consistent
Risk level (write none, low medium or high below)
low
Documentation Update
None
Contributor's checklist