From 3b3f37a105e6e30363bb96c5d42bade35b529632 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 4 Apr 2023 23:03:36 -0700 Subject: [PATCH 1/5] reverting FS based listing for full cleaning in clean Planner --- .../apache/hudi/table/action/clean/CleanPlanner.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 6710fb0124c8b..ae183b678e69a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -44,7 +44,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieSavepointException; -import org.apache.hudi.metadata.FileSystemBackedTableMetadata; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.fs.Path; @@ -213,15 +212,7 @@ private List getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata */ private List getPartitionPathsForFullCleaning() { // Go to brute force mode of scanning all partitions - try { - // Because the partition of BaseTableMetadata has been deleted, - // all partition information can only be obtained from FileSystemBackedTableMetadata. - FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context, - context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning()); - return fsBackedTableMetadata.getAllPartitionPaths(); - } catch (IOException e) { - return Collections.emptyList(); - } + return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath()); } /** From 43150ca55b13c526dbd1ec1cd27c7bcd44c6ca6f Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 6 Apr 2023 16:03:31 -0700 Subject: [PATCH 2/5] Fixing delete partition flow --- .../metadata/HoodieTableMetadataUtil.java | 11 +--- .../hudi/TestAlterTableDropPartition.scala | 65 ++++++++++++++++++- 2 files changed, 66 insertions(+), 10 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 76c29ba45e818..9709b7820c571 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -35,9 +35,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; @@ -319,10 +317,7 @@ public static List convertMetadataToFilesPartitionRecords(HoodieCo // Add record bearing added partitions list List partitionsAdded = getPartitionsAdded(commitMetadata); - // Add record bearing deleted partitions list - List partitionsDeleted = getPartitionsDeleted(commitMetadata); - - records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded, partitionsDeleted)); + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsAdded)); // Update files listing records for each individual partition List> updatedPartitionFilesRecords = @@ -381,7 +376,7 @@ private static List getPartitionsAdded(HoodieCommitMetadata commitMetada } private static List getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { - if (commitMetadata instanceof HoodieReplaceCommitMetadata + /*if (commitMetadata instanceof HoodieReplaceCommitMetadata && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { Map> partitionToReplaceFileIds = ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); @@ -390,7 +385,7 @@ private static List getPartitionsDeleted(HoodieCommitMetadata commitMeta // We need to make sure we properly handle case of non-partitioned tables .map(HoodieTableMetadataUtil::getPartitionIdentifier) .collect(Collectors.toList()); - } + }*/ return Collections.emptyList(); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 7fcd9f29e78b3..84caa148daeae 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -114,16 +114,33 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { |location '$tablePath' |""".stripMargin) + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "dt") + .option(URL_ENCODE_PARTITIONING.key(), urlencode) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Append) + .save(tablePath) + // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')") + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + val partitionPath = if (urlencode) { PartitionPathEncodeUtils.escapePathName("2021/10/01") } else { "2021/10/01" } checkAnswer(s"select dt from $tableName")(Seq(s"2021/10/02")) - assertResult(true)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) + assertResult(false)(existsPath(s"${tmp.getCanonicalPath}/$tableName/$partitionPath")) // show partitions if (urlencode) { @@ -221,11 +238,20 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { "Found duplicate keys 'dt'") } + // insert data + spark.sql(s"""insert into $tableName values (3, "z5", "v1", "2021-10-01"), (4, "l5", "v1", "2021-10-02")""") // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')") - checkAnswer(s"select id, name, ts, dt from $tableName")(Seq(2, "l4", "v1", "2021-10-02")) + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + + checkAnswer(s"select id, name, ts, dt from $tableName")( + Seq(2, "l4", "v1", "2021-10-02"), + Seq(4, "l5", "v1", "2021-10-02") + ) // show partitions checkAnswer(s"show partitions $tableName")(Seq("dt=2021-10-02")) @@ -264,9 +290,27 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { checkExceptionContain(s"alter table $tableName drop partition (year='2021', month='10')")( "All partition columns need to be specified for Hoodie's partition" ) + + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Append) + .save(tablePath) + // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + checkAnswer(s"select id, name, ts, year, month, day from $tableName")( Seq(2, "l4", "v1", "2021", "10", "02") ) @@ -314,9 +358,26 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { | ) |""".stripMargin) + df.write.format("hudi") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD.key, "id") + .option(PRECOMBINE_FIELD.key, "ts") + .option(PARTITIONPATH_FIELD.key, "year,month,day") + .option(HIVE_STYLE_PARTITIONING.key, hiveStyle) + .option(KEYGENERATOR_CLASS_NAME.key, classOf[ComplexKeyGenerator].getName) + .option(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "1") + .option(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "1") + .mode(SaveMode.Append) + .save(tablePath) + // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + // trigger clean so that partition deletion kicks in. + spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") + .collect() + // insert data spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") From 08ec5be9ed4587a91f76f2121ada5e16aea6e194 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 18 Apr 2023 07:03:25 -0700 Subject: [PATCH 3/5] addressing comments --- .../hudi/metadata/HoodieTableMetadataUtil.java | 15 --------------- .../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 13 +++++++++++++ .../sql/hudi/TestAlterTableDropPartition.scala | 13 +++++++++++++ 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 9709b7820c571..8ffc8df952067 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -375,21 +375,6 @@ private static List getPartitionsAdded(HoodieCommitMetadata commitMetada .collect(Collectors.toList()); } - private static List getPartitionsDeleted(HoodieCommitMetadata commitMetadata) { - /*if (commitMetadata instanceof HoodieReplaceCommitMetadata - && WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { - Map> partitionToReplaceFileIds = - ((HoodieReplaceCommitMetadata) commitMetadata).getPartitionToReplaceFileIds(); - - return partitionToReplaceFileIds.keySet().stream() - // We need to make sure we properly handle case of non-partitioned tables - .map(HoodieTableMetadataUtil::getPartitionIdentifier) - .collect(Collectors.toList()); - }*/ - - return Collections.emptyList(); - } - /** * Convert commit action metadata to bloom filter records. * diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index e339d85cae438..f5c81dd2127f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.fs.Path +import org.apache.hudi.avro.model.HoodieCleanMetadata import org.apache.hudi.{HoodieSparkRecordMerger, HoodieSparkUtils} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.model.HoodieAvroRecordMerger import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex @@ -231,6 +233,17 @@ object HoodieSparkSqlTestBase { metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight } + def getLastCleanMetadata(spark: SparkSession, tablePath: String) = { + val metaClient = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(tablePath) + .build() + + val cleanInstant = metaClient.getActiveTimeline.getCleanerTimeline.filterCompletedInstants().lastInstant().get() + TimelineMetadataUtils.deserializeHoodieCleanMetadata(metaClient + .getActiveTimeline.getInstantDetails(cleanInstant).get) + } + private def checkMessageContains(e: Throwable, text: String): Boolean = e.getMessage.trim.contains(text.trim) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 84caa148daeae..a539ba0b453b2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -18,13 +18,17 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.avro.model.HoodieCleanMetadata +import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils} import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} import org.apache.hudi.common.util.{PartitionPathEncodeUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils} import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.{getLastCleanMetadata, getLastCommitMetadata} import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertTrue @@ -134,6 +138,9 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") .collect() + val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) + assertTrue(cleanMetadata.totalFilesDeleted > 0) + val partitionPath = if (urlencode) { PartitionPathEncodeUtils.escapePathName("2021/10/01") } else { @@ -311,6 +318,9 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") .collect() + val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) + assertTrue(cleanMetadata.totalFilesDeleted > 0) + checkAnswer(s"select id, name, ts, year, month, day from $tableName")( Seq(2, "l4", "v1", "2021", "10", "02") ) @@ -378,6 +388,9 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") .collect() + val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) + assertTrue(cleanMetadata.totalFilesDeleted > 0) + // insert data spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") From 4064bbe0691f5243bf326815ff6f46d2ee9e4c69 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 18 Apr 2023 11:02:12 -0700 Subject: [PATCH 4/5] fixing compilation issue --- .../sql/hudi/HoodieSparkSqlTestBase.scala | 2 +- .../hudi/TestAlterTableDropPartition.scala | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index f5c81dd2127f8..804a6c5f63b30 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -239,7 +239,7 @@ object HoodieSparkSqlTestBase { .setBasePath(tablePath) .build() - val cleanInstant = metaClient.getActiveTimeline.getCleanerTimeline.filterCompletedInstants().lastInstant().get() + val cleanInstant = metaClient.reloadActiveTimeline().getCleanerTimeline.filterCompletedInstants().lastInstant().get() TimelineMetadataUtils.deserializeHoodieCleanMetadata(metaClient .getActiveTimeline.getInstantDetails(cleanInstant).get) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index a539ba0b453b2..613c3c6192960 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.avro.model.HoodieCleanMetadata +import org.apache.hudi.avro.model.{HoodieCleanMetadata, HoodieCleanPartitionMetadata} import org.apache.hudi.{HoodieCLIUtils, HoodieSparkUtils} import org.apache.hudi.common.model.HoodieCommitMetadata import org.apache.hudi.common.table.HoodieTableMetaClient @@ -139,7 +139,13 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { .collect() val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) - assertTrue(cleanMetadata.totalFilesDeleted > 0) + val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray() + var totalDeletedFiles = 0 + cleanPartitionMeta.foreach(entry => + { + totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size() + }) + assertTrue(totalDeletedFiles > 0) val partitionPath = if (urlencode) { PartitionPathEncodeUtils.escapePathName("2021/10/01") @@ -319,7 +325,13 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { .collect() val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) - assertTrue(cleanMetadata.totalFilesDeleted > 0) + val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray() + var totalDeletedFiles = 0 + cleanPartitionMeta.foreach(entry => + { + totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size() + }) + assertTrue(totalDeletedFiles > 0) checkAnswer(s"select id, name, ts, year, month, day from $tableName")( Seq(2, "l4", "v1", "2021", "10", "02") @@ -389,7 +401,13 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { .collect() val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, tablePath) - assertTrue(cleanMetadata.totalFilesDeleted > 0) + val cleanPartitionMeta = new java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray() + var totalDeletedFiles = 0 + cleanPartitionMeta.foreach(entry => + { + totalDeletedFiles += entry.asInstanceOf[HoodieCleanPartitionMetadata].getSuccessDeleteFiles.size() + }) + assertTrue(totalDeletedFiles > 0) // insert data spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") From 7e73a5881712dba18c44bb38bb77750a18330062 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Thu, 20 Apr 2023 22:11:37 -0700 Subject: [PATCH 5/5] Fixing tests --- .../org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 613c3c6192960..80cea8b031056 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -396,6 +396,8 @@ class TestAlterTableDropPartition extends HoodieSparkSqlTestBase { // drop 2021-10-01 partition spark.sql(s"alter table $tableName drop partition (year='2021', month='10', day='01')") + spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", "10", "02")""") + // trigger clean so that partition deletion kicks in. spark.sql(s"call run_clean(table => '$tableName', retain_commits => 1)") .collect()