diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala index 51c9ca8269c17..e052f5b1fc5e0 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala @@ -29,7 +29,6 @@ import collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.ParquetFileReader -import org.apache.spark.api.java.JavaRDD import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} @@ -38,10 +37,11 @@ import java.util.function.Supplier class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder { private val PARAMETERS = Array[ProcedureParameter]( ProcedureParameter.required(0, "path", DataTypes.StringType), - ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100), - ProcedureParameter.optional(2, "needDelete", DataTypes.BooleanType, false), - ProcedureParameter.optional(3, "partitions", DataTypes.StringType, ""), - ProcedureParameter.optional(4, "instants", DataTypes.StringType, "") + ProcedureParameter.optional(1, "parallelism", DataTypes.IntegerType, 100), + ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 100), + ProcedureParameter.optional(3, "needDelete", DataTypes.BooleanType, false), + ProcedureParameter.optional(4, "partitions", DataTypes.StringType, ""), + ProcedureParameter.optional(5, "instants", DataTypes.StringType, "") ) private val OUTPUT_TYPE = new StructType(Array[StructField]( @@ -56,31 +56,34 @@ class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder { super.checkArgs(PARAMETERS, args) val srcPath = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String] - val limit = getArgValueOrDefault(args, PARAMETERS(1)) - val needDelete = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean] - val partitions = getArgValueOrDefault(args, PARAMETERS(3)).map(_.toString).getOrElse("") - val instants = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val parallelism = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + val limit = getArgValueOrDefault(args, PARAMETERS(2)) + val needDelete = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean] + val partitions = getArgValueOrDefault(args, PARAMETERS(4)).map(_.toString).getOrElse("") + val instants = getArgValueOrDefault(args, PARAMETERS(5)).map(_.toString).getOrElse("") + val storageConf = HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()) val storage = new HoodieHadoopStorage(srcPath, storageConf) val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build val metadata = HoodieTableMetadata.create(new HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath) val partitionPaths: java.util.List[String] = metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava) - val partitionPathsSize = if (partitionPaths.size() == 0) 1 else partitionPaths.size() val instantsList = if (StringUtils.isNullOrEmpty(instants)) Array.empty[String] else instants.split(",") + val fileStatus = partitionPaths.asScala.flatMap(part => { + val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap()) + HadoopFSUtils.getAllDataFilesInPartition(fs, HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part)) + }).toList - val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, partitionPathsSize) - val parquetRdd = javaRdd.rdd.map(part => { - val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap()) - HadoopFSUtils.getAllDataFilesInPartition(fs, HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part)).filter(fileStatus => { - var isFilter = true - if (!instantsList.isEmpty) { - val parquetCommitTime = FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString) - isFilter = instantsList.contains(parquetCommitTime) - } - isFilter - }) - }).flatMap(_.toList) - .filter(status => { + if (fileStatus.isEmpty) { + Seq.empty + } else { + val parquetRdd = jsc.parallelize(fileStatus, Math.min(fileStatus.size, parallelism)).filter(fileStatus => { + if (instantsList.nonEmpty) { + val parquetCommitTime = FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString) + instantsList.contains(parquetCommitTime) + } else { + true + } + }).filter(status => { val filePath = status.getPath var isInvalid = false if (filePath.toString.endsWith(".parquet")) { @@ -99,12 +102,13 @@ class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder { } } isInvalid - }) - .map(status => Row(status.getPath.toString)) - if (limit.isDefined) { - parquetRdd.take(limit.get.asInstanceOf[Int]) - } else { - parquetRdd.collect() + }).map(status => Row(status.getPath.toString)) + + if (limit.isDefined) { + parquetRdd.take(limit.get.asInstanceOf[Int]) + } else { + parquetRdd.collect() + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala index 71945f7c945ff..d074ab56fccab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala @@ -31,6 +31,7 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { withTempDir { tmp => val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" + val customParallelism = 3 // create table spark.sql( s""" @@ -68,13 +69,13 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { // collect result for table var result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism)""".stripMargin).collect() assertResult(2) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', limit => 1)""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, limit => 1)""".stripMargin).collect() assertResult(1) { result.length } @@ -85,6 +86,7 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { withTempDir { tmp => val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" + val customParallelism = 3 // create table spark.sql( s""" @@ -118,7 +120,7 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { // collect result for table val result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', needDelete => true)""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, needDelete => true)""".stripMargin).collect() assertResult(0) { result.length } @@ -129,6 +131,7 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { withTempDir { tmp => val tableName = generateTableName val basePath = s"${tmp.getCanonicalPath}/$tableName" + val customParallelism = 3 // create table spark.sql( s""" @@ -192,42 +195,42 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { // collect result for table var result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism)""".stripMargin).collect() assertResult(4) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', partitions => 'year=2022')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, partitions => 'year=2022')""".stripMargin).collect() assertResult(4) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', partitions => 'year=2022/month=07')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, partitions => 'year=2022/month=07')""".stripMargin).collect() assertResult(2) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', partitions => 'year=2022/month=08/day=30,year=2022/month=08/day=31')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, partitions => 'year=2022/month=08/day=30,year=2022/month=08/day=31')""".stripMargin).collect() assertResult(2) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', partitions => 'year=2023')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, partitions => 'year=2023')""".stripMargin).collect() assertResult(0) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', instants => '$instantTime')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, instants => '$instantTime')""".stripMargin).collect() assertResult(4) { result.length } result = spark.sql( - s"""call show_invalid_parquet(path => '$basePath', instants => '$instantTime', partitions => 'year=2022/month=08')""".stripMargin).collect() + s"""call show_invalid_parquet(path => '$basePath', parallelism => $customParallelism, instants => '$instantTime', partitions => 'year=2022/month=08')""".stripMargin).collect() assertResult(2) { result.length } @@ -235,3 +238,4 @@ class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase { } } } +