Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@ public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext con
*/
public abstract HoodieWriteMetadata<O> insertOverwrite(HoodieEngineContext context, String instantTime, I records);

/**
* Delete all the existing records of the Hoodie table and inserts the specified new records into Hoodie table at the supplied instantTime,
* for the partition paths contained in input records.
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param records input records
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);

public HoodieWriteConfig getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public HoodieWriteMetadata<List<WriteStatus>> insertOverwrite(HoodieEngineContex
throw new HoodieNotSupportedException("InsertOverWrite is not supported yet");
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, List<HoodieRecord<T>> records) {
throw new HoodieNotSupportedException("insertOverwriteTable is not supported yet");
}

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,23 @@ public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}


/**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.

* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}

@Override
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
return bulkInsert(records, instantTime, Option.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.table.action.bootstrap.SparkBootstrapCommitActionExecutor;
import org.apache.hudi.table.action.clean.SparkCleanActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
Expand Down Expand Up @@ -129,6 +130,11 @@ public HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String i
return new SparkInsertOverwriteCommitActionExecutor(context, config, this, instantTime, records).execute();
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute();
}

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, WriteOperationType.INSERT_OVERWRITE);
this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE);
}

public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
WriteOperationType writeOperationType) {
super(context, config, table, instantTime, writeOperationType);
this.inputRecordsRDD = inputRecordsRDD;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieEngineContext;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends SparkInsertOverwriteCommitActionExecutor<T> {

public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE);
}

protected List<String> getAllExistingFileIds(String partitionPath) {
return table.getSliceView().getLatestFileSlices(partitionPath)
.map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
}

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(JavaRDD<WriteStatus> writeStatuses) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
try {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(table.getMetaClient().getFs(),
table.getMetaClient().getBasePath(), false);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
partitionToExistingFileIds = partitionPathRdd.mapToPair(
partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
}
} catch (IOException e) {
throw new HoodieCommitException("In InsertOverwriteTable action failed to get existing fileIds of all partition "
+ config.getBasePath() + " at time " + instantTime, e);
}
return partitionToExistingFileIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ public enum WriteOperationType {
// delete
DELETE("delete"),
BOOTSTRAP("bootstrap"),
// insert overwrite
// insert overwrite with static partitioning
INSERT_OVERWRITE("insert_overwrite"),
// cluster
CLUSTER("cluster"),
// insert overwrite with dynamic partitioning
INSERT_OVERWRITE_TABLE("insert_overwrite_table"),
// used for old version
UNKNOWN("unknown");

Expand Down Expand Up @@ -72,6 +74,8 @@ public static WriteOperationType fromValue(String value) {
return DELETE;
case "insert_overwrite":
return INSERT_OVERWRITE;
case "insert_overwrite_table":
return INSERT_OVERWRITE_TABLE;
default:
throw new HoodieException("Invalid value of Type.");
}
Expand All @@ -88,4 +92,4 @@ public String value() {
public static boolean isChangingRecords(WriteOperationType operationType) {
return operationType == UPSERT || operationType == UPSERT_PREPPED || operationType == DELETE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,10 @@ public List<HoodieRecord> generateInsertsContainsAllPartitions(String instantTim
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, true).collect(Collectors.toList());
}

public List<HoodieRecord> generateInsertsForPartition(String instantTime, Integer n, String partition) {
return generateInsertsStream(instantTime, n, false, TRIP_EXAMPLE_SCHEMA, false, () -> partition, () -> UUID.randomUUID().toString()).collect(Collectors.toList());
}

public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer n, boolean isFlattened, String schemaStr, boolean containsAllPartitions) {
return generateInsertsStream(commitTime, n, isFlattened, schemaStr, containsAllPartitions,
() -> partitionPaths[RAND.nextInt(partitionPaths.length)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static InputSplit[] getRealtimeSplits(Configuration conf, Stream<FileSpli
.collect(Collectors.groupingBy(split -> FSUtils.getFileId(split.getPath().getName())));
// Get the maxCommit from the last delta or compaction or commit - when bootstrapped from COW table
String maxCommitTime = metaClient.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().getTimestamp();
latestFileSlices.forEach(fileSlice -> {
List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, Stri
}

public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) {
if (operation == WriteOperationType.INSERT_OVERWRITE) {
if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE) {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
} else {
return CommitUtils.getCommitActionType(tableType);
Expand All @@ -211,6 +211,8 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav
return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime));
case INSERT_OVERWRITE:
return client.insertOverwrite(hoodieRecords, instantTime);
case INSERT_OVERWRITE_TABLE:
return client.insertOverwriteTable(hoodieRecords, instantTime);
default:
throw new HoodieException("Not a valid operation type for doWriteOperation: " + operation.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, Strin
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
HoodieActiveTimeline.DELTA_COMMIT_ACTION)).filterCompletedInstants();
HoodieActiveTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.REPLACE_COMMIT_ACTION)).filterCompletedInstants();
} else {
return metaClient.getCommitTimeline().filterCompletedInstants();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ object DataSourceWriteOptions {
val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
val INSERT_OVERWRITE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE.value
val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE_TABLE.value
val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ private[hudi] object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
}

// If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
// Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
// the table. This will replace the old fs.delete(tablepath) mode.
if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
operation = WriteOperationType.INSERT_OVERWRITE_TABLE
}

val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
Expand Down Expand Up @@ -319,10 +326,6 @@ private[hudi] object HoodieSparkSqlWriter {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
} else if (mode == SaveMode.Overwrite && tableExists) {
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(tablePath, true)
tableExists = false
}
} else {
// Delete Operation only supports Append mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
package org.apache.hudi.functional

import java.sql.{Date, Timestamp}
import java.util.function.Supplier
import java.util.stream.Stream

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
Expand Down Expand Up @@ -156,6 +162,79 @@ class TestCOWDataSource extends HoodieClientTestBase {
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
}

@Test def testOverWriteModeUseReplaceAction(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran a similar test using quick start setup guide:

  1. ingest 10 records for partition "2020/03/11"
    `scala> val inserts = convertToStringList(dataGen.generateInserts(10))
    inserts: java.util.List[String] = [{"ts": 0, "uuid": "299d5202-1ea0-4918-9d2f-2365bc1c2402", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "2020/03/11"}, {"ts": 0, "uuid": "0fc23a14-c815-4b09-bff1-c6193a6de5b7", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "2020/03/11"}, {"ts": 0, "uuid": "7136e8f8-ed82-4fc4-b60d-f7367f7be791", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.5731835407930634, "begin_lon": 0.4923479652912024, "end_lat":...
    scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 1))
    warning: there was one deprecation warning; re-run with -deprecation for details
    df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields]

scala> df.write.format("org.apache.hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Overwrite).
| save(basePath);`

  1. Query the data back and see that records are written correctly
    `scala> val tripsSnapshotDF = spark.
    | read.
    | format("org.apache.hudi").
    | load(basePath + "////")
    20/11/04 14:51:53 WARN DefaultSource: Loading Base File Only View.
    tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields]

scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

scala>

scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
| 20201104145141|299d5202-1ea0-491...| 2020/03/11|rider-213|driver-213|34.158284716382845|
| 20201104145141|0fc23a14-c815-4b0...| 2020/03/11|rider-213|driver-213| 43.4923811219014|
| 20201104145141|7136e8f8-ed82-4fc...| 2020/03/11|rider-213|driver-213| 64.27696295884016|
| 20201104145141|5ffa488e-d75e-4ef...| 2020/03/11|rider-213|driver-213| 93.56018115236618|
| 20201104145141|cf09166f-dc3f-45e...| 2020/03/11|rider-213|driver-213|17.851135255091155|
| 20201104145141|6f522490-e29e-419...| 2020/03/11|rider-213|driver-213|19.179139106643607|
| 20201104145141|db97e3ef-ad7a-4e8...| 2020/03/11|rider-213|driver-213| 33.92216483948643|
| 20201104145141|a42d7c22-d0bf-4b9...| 2020/03/11|rider-213|driver-213| 66.62084366450246|
| 20201104145141|94154d3e-c3da-436...| 2020/03/11|rider-213|driver-213| 41.06290929046368|
| 20201104145141|618b3f38-bb71-402...| 2020/03/11|rider-213|driver-213| 27.79478688582596|
+-------------------+--------------------+----------------------+---------+----------+------------------+
`

  1. Use insert_overwrite and write to new partition (This is with master, not using your change. So I still have insert_overwrite operation with Append mode)
    `scala> val dataGen = new DataGenerator(Array("2020/09/11"))
    dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@f00b18a

scala> val inserts2 = convertToStringList(dataGen.generateInserts(1))
scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts2, 1))
scala> df.write.format("org.apache.hudi").
| options(getQuickstartWriteConfigs).
| option(PRECOMBINE_FIELD_OPT_KEY, "ts").
| option(RECORDKEY_FIELD_OPT_KEY, "uuid").
| option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
| option(TABLE_NAME, tableName).
| mode(Append).
| option(OPERATION_OPT_KEY, "insert_overwrite").
| save(basePath);4. Query the data backscala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show()
+-------------------+--------------------+----------------------+---------+----------+------------------+
|_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare|
+-------------------+--------------------+----------------------+---------+----------+------------------+
| 20201104145258|299d5202-1ea0-491...| 2020/03/11|rider-213|driver-213|34.158284716382845|
| 20201104145258|0fc23a14-c815-4b0...| 2020/03/11|rider-213|driver-213| 43.4923811219014|
| 20201104145258|7136e8f8-ed82-4fc...| 2020/03/11|rider-213|driver-213| 64.27696295884016|
| 20201104145258|5ffa488e-d75e-4ef...| 2020/03/11|rider-213|driver-213| 93.56018115236618|
| 20201104145258|cf09166f-dc3f-45e...| 2020/03/11|rider-213|driver-213|17.851135255091155|
| 20201104145258|6f522490-e29e-419...| 2020/03/11|rider-213|driver-213|19.179139106643607|
| 20201104145258|db97e3ef-ad7a-4e8...| 2020/03/11|rider-213|driver-213| 33.92216483948643|
| 20201104145258|a42d7c22-d0bf-4b9...| 2020/03/11|rider-213|driver-213| 66.62084366450246|
| 20201104145258|94154d3e-c3da-436...| 2020/03/11|rider-213|driver-213| 41.06290929046368|
| 20201104145258|618b3f38-bb71-402...| 2020/03/11|rider-213|driver-213| 27.79478688582596|
| 20201104145348|4dac8aa3-b8fa-410...| 2020/09/11|rider-284|driver-284|49.527694252432056|
+-------------------+--------------------+----------------------+---------+----------+------------------+
`

As you can see in step4, we see all 11 records. With 'SaveMode.Overwrite' we should only see 1 record. Hope this is clear.

val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)

val records2 = recordsToStrings(dataGen.generateInserts("002", 5)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
assertEquals(2, commits.size)
assertEquals("commit", commits(0))
assertEquals("replacecommit", commits(1))
}

@Test def testOverWriteModeUseReplaceActionOnDisJointPartitions(): Unit = {
// step1: Write 5 records to hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 5, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)

// step2: Write 7 more rectestOverWriteModeUseReplaceActionords using SaveMode.Overwrite for partition2 DEFAULT_SECOND_PARTITION_PATH
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 7, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Overwrite)
.save(basePath)

val allRecords = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*")
allRecords.registerTempTable("tmpTable")

spark.sql(String.format("select count(*) from tmpTable")).show()

// step3: Query the rows count from hoodie table for partition1 DEFAULT_FIRST_PARTITION_PATH
val recordCountForParititon1 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpTable only registered inputDF2, so you will not get data for partition1 even if we do SaveMode.Append in line 206? Don't you need to read back all data from table? Can you please fix test setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix it

assertEquals("0", recordCountForParititon1(0).get(0).toString)

// step4: Query the rows count from hoodie table for partition1 DEFAULT_SECOND_PARTITION_PATH
val recordCountForParititon2 = spark.sql(String.format("select count(*) from tmpTable where partition = '%s'", HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).collect()
assertEquals("7", recordCountForParititon2(0).get(0).toString)

// step5: Query the rows count from hoodie table
val recordCount = spark.sql(String.format("select count(*) from tmpTable")).collect()
assertEquals("7", recordCountForParititon2(0).get(0).toString)

// step6: Query the rows count from hoodie table for partition1 DEFAULT_SECOND_PARTITION_PATH using spark.collect and then filter mode
val recordsForPartitionColumn = spark.sql(String.format("select partition from tmpTable")).collect()
val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
assertEquals(7,filterSecondPartitionCount)

val metaClient = new HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
assertEquals(2, commits.size)
assertEquals("commit", commits(0))
assertEquals("replacecommit", commits(1))
}

@Test def testDropInsertDup(): Unit = {
val insert1Cnt = 10
val insert2DupKeyCnt = 9
Expand Down