From a36332015b1d0529a20a539724da39e988c88661 Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Wed, 3 Aug 2022 20:24:07 +0800 Subject: [PATCH 1/2] [HUDI-4533] Fix RunCleanProcedure's ArrayIndexOutOfBoundsException --- .../procedures/RunCleanProcedure.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala index b5d942d5e6f7..878de13805b9 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala @@ -26,6 +26,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} +import java.util + class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging { private val PARAMETERS = Array[ProcedureParameter]( @@ -75,14 +77,18 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging } val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props) val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking) + val rows = new util.ArrayList[Row] + + if (hoodieCleanMeta != null) { + rows.add(Row(hoodieCleanMeta.getStartCleanTime, + hoodieCleanMeta.getTimeTakenInMillis, + hoodieCleanMeta.getTotalFilesDeleted, + hoodieCleanMeta.getEarliestCommitToRetain, + JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), + hoodieCleanMeta.getVersion)) + } - if (hoodieCleanMeta == null) Seq(Row.empty) - else Seq(Row(hoodieCleanMeta.getStartCleanTime, - hoodieCleanMeta.getTimeTakenInMillis, - hoodieCleanMeta.getTotalFilesDeleted, - hoodieCleanMeta.getEarliestCommitToRetain, - JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), - hoodieCleanMeta.getVersion)) + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList } } From 72fd17fac42202fb9a685b5bce97b00b9b045d2a Mon Sep 17 00:00:00 2001 From: XuQianJin-Stars Date: Thu, 4 Aug 2022 09:58:30 +0800 Subject: [PATCH 2/2] [HUDI-4533] Fix RunCleanProcedure's ArrayIndexOutOfBoundsException --- .../procedures/RunCleanProcedure.scala | 21 +++++++------------ .../hudi/procedure/TestCleanProcedure.scala | 4 ++++ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala index 878de13805b9..2a0143bafba8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunCleanProcedure.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hudi.command.procedures -import java.util.function.Supplier import org.apache.hudi.HoodieCLIUtils import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.util.JsonUtils @@ -26,7 +25,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} -import java.util +import java.util.function.Supplier class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging { @@ -77,18 +76,14 @@ class RunCleanProcedure extends BaseProcedure with ProcedureBuilder with Logging } val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, props) val hoodieCleanMeta = client.clean(cleanInstantTime, scheduleInLine, skipLocking) - val rows = new util.ArrayList[Row] - - if (hoodieCleanMeta != null) { - rows.add(Row(hoodieCleanMeta.getStartCleanTime, - hoodieCleanMeta.getTimeTakenInMillis, - hoodieCleanMeta.getTotalFilesDeleted, - hoodieCleanMeta.getEarliestCommitToRetain, - JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), - hoodieCleanMeta.getVersion)) - } - rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + if (hoodieCleanMeta == null) Seq.empty + else Seq(Row(hoodieCleanMeta.getStartCleanTime, + hoodieCleanMeta.getTimeTakenInMillis, + hoodieCleanMeta.getTotalFilesDeleted, + hoodieCleanMeta.getEarliestCommitToRetain, + JsonUtils.getObjectMapper.writeValueAsString(hoodieCleanMeta.getBootstrapPartitionMetadata), + hoodieCleanMeta.getVersion)) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala index 316dccca5201..49d3452d6959 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanProcedure.scala @@ -58,6 +58,10 @@ class TestCleanProcedure extends HoodieSparkSqlTestBase { checkAnswer(s"select id, name, price, ts from $tableName order by id") ( Seq(1, "a1", 13, 1000) ) + + val result2 = spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + assertResult(0)(result2.length) } }