[HUDI-9334] Optimize Parallelism of show_invalid_parquet#13206
[HUDI-9334] Optimize Parallelism of show_invalid_parquet#13206zhangyue19921010 merged 8 commits intoapache:masterfrom
Conversation
zhangyue19921010
left a comment
There was a problem hiding this comment.
LGTM, Wait for CI Passed
| case ex: Exception => | ||
| isInvalid = true | ||
| } | ||
| val parquetRdd = jsc.parallelize(fileStatus, Math.max(fileStatus.size, 1)).filter(fileStatus => { |
There was a problem hiding this comment.
How about allowing a custom parallelism to be passed here and then aggregating the file status into this parallelism? Using the number of partitions as the parallelism leads to too low concurrency and a long running time for a single task. But will directly using the number of files as the concurrency degree result in too many tasks? In some scenarios, tens of thousands of files are possible, but tens of thousands of concurrent degrees will put a lot of pressure on the task scheduling of spark.
There was a problem hiding this comment.
thanks for advising, I'll introduce a custom parallelism for avoiding too many tasks
|
@hudi-bot run azure |
| ProcedureParameter.optional(2, "needDelete", DataTypes.BooleanType, false), | ||
| ProcedureParameter.optional(3, "partitions", DataTypes.StringType, ""), | ||
| ProcedureParameter.optional(4, "instants", DataTypes.StringType, "") | ||
| ProcedureParameter.required(1, "customParallelism", DataTypes.IntegerType), |
There was a problem hiding this comment.
How about
ProcedureParameter.optional(1, "parallelism", DataTypes.IntegerType, 100),
if (fileStatus.isEmpty) {
Seq.empty
} else {
val parquetRdd = jsc.parallelize(fileStatus, Math.max(fileStatus.size, parallelism)).filter(fileStatus => {
|
@hudi-bot run azure |
1 similar comment
|
@hudi-bot run azure |
|
There are test failures: [ERROR] Errors:
[ERROR] TestDataSourceUtils>HoodieClientTestBase.setUp:70->HoodieSparkClientTestHarness.initResources: |
* [HUDI-9334] optimize parallelism of show_invalid_parquet * solve type mismatch * introduce customParallelism * rm invalid imports * switch parallelism to optional * fix scalastyle * fix scalastyle --------- Co-authored-by: fhan <yfhanfei@jd.com>
Change Logs
the former parallelism is based on partition path which may not enough in some scenarios.
this PR optimize this by building parallelism by number of files.
before:

after:

Impact
hudi-spark-datasource
Risk level (write none, low medium or high below)
low
Documentation Update
none
Contributor's checklist