diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 0413f75ce066d..a33ad99e03a33 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -205,6 +205,17 @@ public abstract HoodieWriteMetadata bulkInsertPrepped(HoodieEngineContext con */ public abstract HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String instantTime, I records); + /** + * Delete all the existing records of the Hoodie table and inserts the specified new records into Hoodie table at the supplied instantTime, + * for the partition paths contained in input records. + * + * @param context HoodieEngineContext + * @param instantTime Instant time for the replace action + * @param records input records + * @return HoodieWriteMetadata + */ + public abstract HoodieWriteMetadata insertOverwriteTable(HoodieEngineContext context, String instantTime, I records); + public HoodieWriteConfig getConfig() { return config; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index acb010c313065..b8ae370f129d4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -106,6 +106,11 @@ public HoodieWriteMetadata> insertOverwrite(HoodieEngineContex throw new HoodieNotSupportedException("InsertOverWrite is not supported yet"); } + @Override + public HoodieWriteMetadata> insertOverwriteTable(HoodieEngineContext context, String instantTime, List> records) { + throw new HoodieNotSupportedException("insertOverwriteTable is not supported yet"); + } + @Override public Option scheduleCompaction(HoodieEngineContext context, String instantTime, Option> extraMetadata) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 56f06898abba2..10a55df9f882d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -191,6 +191,23 @@ public HoodieWriteResult insertOverwrite(JavaRDD> records, final return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); } + + /** + * Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table. + + * @param records HoodieRecords to insert + * @param instantTime Instant time of the commit + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public HoodieWriteResult insertOverwriteTable(JavaRDD> records, final String instantTime) { + HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); + table.validateInsertSchema(); + setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE); + this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime); + HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records); + return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); + } + @Override public JavaRDD bulkInsert(JavaRDD> records, String instantTime) { return bulkInsert(records, instantTime, Option.empty()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index f2b336432b247..99a8f1f3c10c3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -45,6 +45,7 @@ import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor; import org.apache.hudi.table.action.clean.SparkCleanActionExecutor; import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor; +import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor; @@ -129,6 +130,11 @@ public HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String i return new SparkInsertOverwriteCommitActionExecutor(context, config, this, instantTime, records).execute(); } + @Override + public HoodieWriteMetadata> insertOverwriteTable(HoodieEngineContext context, String instantTime, JavaRDD> records) { + return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute(); + } + @Override public Option scheduleCompaction(HoodieEngineContext context, String instantTime, Option> extraMetadata) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java index 2771a22c9f12c..1e3822016a765 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java @@ -44,7 +44,14 @@ public class SparkInsertOverwriteCommitActionExecutor> inputRecordsRDD) { - super(context, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE); + this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE); + } + + public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, JavaRDD> inputRecordsRDD, + WriteOperationType writeOperationType) { + super(context, config, table, instantTime, writeOperationType); this.inputRecordsRDD = inputRecordsRDD; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java new file mode 100644 index 0000000000000..e349657b7e44e --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieEngineContext; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SparkInsertOverwriteTableCommitActionExecutor> + extends SparkInsertOverwriteCommitActionExecutor { + + public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context, + HoodieWriteConfig config, HoodieTable table, + String instantTime, JavaRDD> inputRecordsRDD) { + super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE); + } + + protected List getAllExistingFileIds(String partitionPath) { + return table.getSliceView().getLatestFileSlices(partitionPath) + .map(fg -> fg.getFileId()).distinct().collect(Collectors.toList()); + } + + @Override + protected Map> getPartitionToReplacedFileIds(JavaRDD writeStatuses) { + Map> partitionToExistingFileIds = new HashMap<>(); + try { + List partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(), + table.getMetaClient().getBasePath(), false); + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); + if (partitionPaths != null && partitionPaths.size() > 0) { + context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); + JavaRDD partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size()); + partitionToExistingFileIds = partitionPathRdd.mapToPair( + partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap(); + } + } catch (IOException e) { + throw new HoodieCommitException("In InsertOverwriteTable action failed to get existing fileIds of all partition " + + config.getBasePath() + " at time " + instantTime, e); + } + return partitionToExistingFileIds; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index f6386b94d53b9..5f328a9bc69e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -38,10 +38,12 @@ public enum WriteOperationType { // delete DELETE("delete"), BOOTSTRAP("bootstrap"), - // insert overwrite + // insert overwrite with static partitioning INSERT_OVERWRITE("insert_overwrite"), // cluster CLUSTER("cluster"), + // insert overwrite with dynamic partitioning + INSERT_OVERWRITE_TABLE("insert_overwrite_table"), // used for old version UNKNOWN("unknown"); @@ -72,6 +74,8 @@ public static WriteOperationType fromValue(String value) { return DELETE; case "insert_overwrite": return INSERT_OVERWRITE; + case "insert_overwrite_table": + return INSERT_OVERWRITE_TABLE; default: throw new HoodieException("Invalid value of Type."); } @@ -88,4 +92,4 @@ public String value() { public static boolean isChangingRecords(WriteOperationType operationType) { return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE; } -} \ No newline at end of file +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index bf9728045e530..17e93feca95d2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -451,6 +451,10 @@ public List generateInsertsContainsAllPartitions(String instantTim return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, true).collect(Collectors.toList()); } + public List generateInsertsForPartition(String instantTime, Integer n, String partition) { + return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> UUID.randomUUID().toString()).collect(Collectors.toList()); + } + public Stream generateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) { return generateInsertsStream(commitTime, n, isFlattened, schemaStr, containsAllPartitions, () -> partitionPaths[RAND.nextInt(partitionPaths.length)], diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 760dd961be65b..c8a0d7f82e96d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -86,7 +86,7 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream FSUtils.getFileId(split.getPath().getName()))); // Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, - HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)) + HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)) .filterCompletedInstants().lastInstant().get().getTimestamp(); latestFileSlices.forEach(fileSlice -> { List dataFileSplits = groupedInputSplits.get(fileSlice.getFileId()); diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 37572c3678485..1cb63c98a2db4 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -191,7 +191,7 @@ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, Stri } public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) { - if (operation == WriteOperationType.INSERT_OVERWRITE) { + if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE) { return HoodieTimeline.REPLACE_COMMIT_ACTION; } else { return CommitUtils.getCommitActionType(tableType); @@ -211,6 +211,8 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime)); case INSERT_OVERWRITE: return client.insertOverwrite(hoodieRecords, instantTime); + case INSERT_OVERWRITE_TABLE: + return client.insertOverwriteTable(hoodieRecords, instantTime); default: throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation.toString()); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java index 1467cf6bca8f0..734e0c0ea7a91 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java +++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java @@ -74,7 +74,8 @@ public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, Strin if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) { return metaClient.getActiveTimeline().getTimelineOfActions( CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION, - HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants(); + HoodieActiveTimeline.DELTA_COMMIT_ACTION, + HoodieActiveTimeline.REPLACE_COMMIT_ACTION)).filterCompletedInstants(); } else { return metaClient.getCommitTimeline().filterCompletedInstants(); } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 73f70e72d3bb6..9ff6b2f081775 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -157,6 +157,7 @@ object DataSourceWriteOptions { val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value val INSERT_OVERWRITE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE.value + val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE_TABLE.value val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL /** diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fe0f3f5e47316..e1095012eb2b1 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -93,6 +93,13 @@ private[hudi] object HoodieSparkSqlWriter { operation = WriteOperationType.INSERT } + // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE. + // Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite + // the table. This will replace the old fs.delete(tablepath) mode. + if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { + operation = WriteOperationType.INSERT_OVERWRITE_TABLE + } + val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(path.get) val instantTime = HoodieActiveTimeline.createNewInstantTime() @@ -319,10 +326,6 @@ private[hudi] object HoodieSparkSqlWriter { if (operation != WriteOperationType.DELETE) { if (mode == SaveMode.ErrorIfExists && tableExists) { throw new HoodieException(s"hoodie table at $tablePath already exists.") - } else if (mode == SaveMode.Overwrite && tableExists) { - log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.") - fs.delete(tablePath, true) - tableExists = false } } else { // Delete Operation only supports Append mode diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 5b746deb0d51a..bf71e68688bdf 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,7 +18,13 @@ package org.apache.hudi.functional import java.sql.{Date, Timestamp} +import java.util.function.Supplier +import java.util.stream.Stream +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.HoodieClientTestBase @@ -156,6 +162,79 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled } + @Test def testOverWriteModeUseReplaceAction(): Unit = { + val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) + val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray + .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction) + assertEquals(2, commits.size) + assertEquals("commit", commits(0)) + assertEquals("replacecommit", commits(1)) + } + + @Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = { + // step1: Write 5 records to hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH + val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + // step2: Write 7 more rectestOverWriteModeUseReplaceActionords using SaveMode.Overwrite for partition2 DEFAULT_SECOND_PARTITION_PATH + val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + val allRecords = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*") + allRecords.registerTempTable("tmpTable") + + spark.sql(String.format("select count(*) from tmpTable")).show() + + // step3: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH + val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect() + assertEquals("0", recordCountForParititon1(0).get(0).toString) + + // step4: Query the rows count from hoodie table for partition1 DEFAULT_SECOND_PARTITION_PATH + val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect() + assertEquals("7", recordCountForParititon2(0).get(0).toString) + + // step5: Query the rows count from hoodie table + val recordCount = spark.sql(String.format("select count(*) from tmpTable")).collect() + assertEquals("7", recordCountForParititon2(0).get(0).toString) + + // step6: Query the rows count from hoodie table for partition1 DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode + val recordsForPartitionColumn = spark.sql(String.format("select partition from tmpTable")).collect() + val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size + assertEquals(7,filterSecondPartitionCount) + + val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true) + val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray + .map(instant => instant.asInstanceOf[HoodieInstant].getAction) + assertEquals(2, commits.size) + assertEquals("commit", commits(0)) + assertEquals("replacecommit", commits(1)) + } + @Test def testDropInsertDup(): Unit = { val insert1Cnt = 10 val insert2DupKeyCnt = 9