diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index ceac9eb2cd10f..eb0c6ea899bcc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -18,7 +18,9 @@ package org.apache.hudi.metadata; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -67,6 +69,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -98,8 +101,19 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta protected SerializableConfiguration hadoopConf; protected final transient HoodieEngineContext engineContext; - protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, - HoodieEngineContext engineContext) { + /** + * Hudi backed table metadata writer. + * + * @param hadoopConf - Hadoop configuration to use for the metadata writer + * @param writeConfig - Writer config + * @param engineContext - Engine context + * @param actionMetadata - Optional action metadata to help decide bootstrap operations + * @param - Action metadata types extending Avro generated SpecificRecordBase + */ + protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, + HoodieWriteConfig writeConfig, + HoodieEngineContext engineContext, + Option actionMetadata) { this.dataWriteConfig = writeConfig; this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(hadoopConf); @@ -110,15 +124,20 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteC enabled = true; // Inline compaction and auto clean is required as we dont expose this table outside - ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), "Cleaning is controlled internally for Metadata table."); - ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), "Compaction is controlled internally for metadata table."); + ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), + "Cleaning is controlled internally for Metadata table."); + ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), + "Compaction is controlled internally for metadata table."); // Metadata Table cannot have metadata listing turned on. (infinite loop, much?) - ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), "Auto commit is required for Metadata Table"); - ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), "File listing cannot be used for Metadata Table"); + ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), + "Auto commit is required for Metadata Table"); + ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), + "File listing cannot be used for Metadata Table"); initRegistry(); - this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); - initialize(engineContext); + this.dataMetaClient = + HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); + initialize(engineContext, actionMetadata); initTableMetadata(); } else { enabled = false; @@ -215,10 +234,11 @@ public HoodieBackedTableMetadata metadata() { /** * Initialize the metadata table if it does not exist. - * - * If the metadata table did not exist, then file and partition listing is used to bootstrap the table. + *

+ * If the metadata table does not exist, then file and partition listing is used to bootstrap the table. */ - protected abstract void initialize(HoodieEngineContext engineContext); + protected abstract void initialize(HoodieEngineContext engineContext, + Option actionMetadata); public void initTableMetadata() { try { @@ -233,26 +253,33 @@ public void initTableMetadata() { } } - protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableMetaClient dataMetaClient) throws IOException { + /** + * Bootstrap the metadata table if needed. + * + * @param engineContext - Engine context + * @param dataMetaClient - Meta client for the data table + * @param actionMetadata - Optional action metadata + * @param - Action metadata types extending Avro generated SpecificRecordBase + * @throws IOException + */ + protected void bootstrapIfNeeded(HoodieEngineContext engineContext, + HoodieTableMetaClient dataMetaClient, + Option actionMetadata) throws IOException { HoodieTimer timer = new HoodieTimer().startTimer(); - boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME)); + + boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), + HoodieTableMetaClient.METAFOLDER_NAME)); boolean rebootstrap = false; + + // If the un-synced instants have been archived, then + // the metadata table will need to be bootstrapped again. if (exists) { - // If the un-synched instants have been archived then the metadata table will need to be bootstrapped again - HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()) + final HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()) .setBasePath(metadataWriteConfig.getBasePath()).build(); - Option latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); - if (!latestMetadataInstant.isPresent()) { - LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found"); - rebootstrap = true; - } else if (!latestMetadataInstant.get().getTimestamp().equals(SOLO_COMMIT_TIMESTAMP) - && dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp())) { - // TODO: Revisit this logic and validate that filtering for all commits timeline is the right thing to do - LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." - + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() - + ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); - rebootstrap = true; - } + final Option latestMetadataInstant = + metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); + + rebootstrap = isBootstrapNeeded(latestMetadataInstant, actionMetadata); } if (rebootstrap) { @@ -270,6 +297,52 @@ protected void bootstrapIfNeeded(HoodieEngineContext engineContext, HoodieTableM } } + /** + * Whether bootstrap operation needed for this metadata table. + *

+ * Rollback of the first commit would look like un-synced instants in the metadata table. + * Action metadata is needed to verify the instant time and avoid erroneous bootstrapping. + *

+ * TODO: Revisit this logic and validate that filtering for all + * commits timeline is the right thing to do + * + * @return True if the bootstrap is not needed, False otherwise + */ + private boolean isBootstrapNeeded(Option latestMetadataInstant, + Option actionMetadata) { + if (!latestMetadataInstant.isPresent()) { + LOG.warn("Metadata Table will need to be re-bootstrapped as no instants were found"); + return true; + } + + final String latestMetadataInstantTimestamp = latestMetadataInstant.get().getTimestamp(); + if (latestMetadataInstantTimestamp.equals(SOLO_COMMIT_TIMESTAMP)) { + return false; + } + + boolean isRollbackAction = false; + List rollbackedTimestamps = Collections.emptyList(); + if (actionMetadata.isPresent() && actionMetadata.get() instanceof HoodieRollbackMetadata) { + isRollbackAction = true; + List rollbackedInstants = + ((HoodieRollbackMetadata) actionMetadata.get()).getInstantsRollback(); + rollbackedTimestamps = rollbackedInstants.stream().map(instant -> { + return instant.getCommitTime().toString(); + }).collect(Collectors.toList()); + } + + if (dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts( + latestMetadataInstant.get().getTimestamp()) + && (!isRollbackAction || !rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) { + LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced instants have been archived." + + " latestMetadataInstant=" + latestMetadataInstant.get().getTimestamp() + + ", latestDataInstant=" + dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp()); + return true; + } + + return false; + } + /** * Initialize the Metadata Table by listing files and partitions from the file system. * diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 994c74b02e0c3..e7e84b86d882d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -706,11 +707,23 @@ public HoodieEngineContext getContext() { } /** - * Fetch instance of {@link HoodieTableMetadataWriter}. + * Get Table metadata writer. + * + * @return instance of {@link HoodieTableMetadataWriter + */ + public final Option getMetadataWriter() { + return getMetadataWriter(Option.empty()); + } + + /** + * Get Table metadata writer. + * * @return instance of {@link HoodieTableMetadataWriter} */ - public Option getMetadataWriter() { - // Each engine is expected to override this and provide the actual metadata writer if enabled. + public Option getMetadataWriter(Option actionMetadata) { + // Each engine is expected to override this and + // provide the actual metadata writer, if enabled. return Option.empty(); } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index 73083cdecabd3..cd32a5bc87307 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -72,7 +73,7 @@ protected final void writeTableMetadata(HoodieCleanMetadata metadata) { * @param metadata rollback metadata of interest. */ protected final void writeTableMetadata(HoodieRollbackMetadata metadata) { - table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime)); + table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> w.update(metadata, instantTime)); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index c19c6fa4560a6..9ae3e622d35da 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.metadata; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -45,12 +46,23 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad private static final Logger LOG = LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class); - public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) { - return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context); + public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, + HoodieEngineContext context) { + return create(conf, writeConfig, context, Option.empty()); } - FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) { - super(hadoopConf, writeConfig, engineContext); + public static HoodieTableMetadataWriter create(Configuration conf, + HoodieWriteConfig writeConfig, + HoodieEngineContext context, + Option actionMetadata) { + return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata); + } + + FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, + HoodieWriteConfig writeConfig, + HoodieEngineContext engineContext, + Option actionMetadata) { + super(hadoopConf, writeConfig, engineContext, actionMetadata); } @Override @@ -65,10 +77,11 @@ protected void initRegistry() { } @Override - protected void initialize(HoodieEngineContext engineContext) { + protected void initialize(HoodieEngineContext engineContext, + Option actionMetadata) { try { if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index d7eed45dfe00e..bade5b3d91a15 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -98,11 +99,11 @@ protected HoodieIndex>, List, List getMetadataWriter() { + public Option getMetadataWriter(Option actionMetadata) { synchronized (this) { if (!isMetadataAvailabilityUpdated) { - // this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case. - // this is done to avoid repeated calls to fs.exists(). + // This code assumes that if metadata availability is updated once it will not change. + // Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists(). try { isMetadataTableAvailable = config.isMetadataTableEnabled() && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 3324455a09807..e59e195836149 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.metadata; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -47,12 +48,23 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad private static final Logger LOG = LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class); - public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, HoodieEngineContext context) { - return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context); + public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig, + HoodieEngineContext context) { + return create(conf, writeConfig, context, Option.empty()); } - SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) { - super(hadoopConf, writeConfig, engineContext); + public static HoodieTableMetadataWriter create(Configuration conf, + HoodieWriteConfig writeConfig, + HoodieEngineContext context, + Option actionMetadata) { + return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, context, actionMetadata); + } + + SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, + HoodieWriteConfig writeConfig, + HoodieEngineContext engineContext, + Option actionMetadata) { + super(hadoopConf, writeConfig, engineContext, actionMetadata); } @Override @@ -71,7 +83,8 @@ protected void initRegistry() { } @Override - protected void initialize(HoodieEngineContext engineContext) { + protected void initialize(HoodieEngineContext engineContext, + Option actionMetadata) { try { metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> { if (registry instanceof DistributedRegistry) { @@ -81,7 +94,7 @@ protected void initialize(HoodieEngineContext engineContext) { }); if (enabled) { - bootstrapIfNeeded(engineContext, dataMetaClient); + bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata); } } catch (IOException e) { LOG.error("Failed to initialize metadata table. Disabling the writer.", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java index a7b14be5f5c38..ca5c4515605c5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -102,11 +103,11 @@ protected HoodieIndex>, JavaRDD, JavaRDD getMetadataWriter() { + public Option getMetadataWriter(Option actionMetadata) { synchronized (this) { if (!isMetadataAvailabilityUpdated) { - // this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case. - // this is done to avoid repeated calls to fs.exists(). + // This code assumes that if metadata availability is updated once it will not change. + // Please revisit this logic if that's not the case. This is done to avoid repeated calls to fs.exists(). try { isMetadataTableAvailable = config.isMetadataTableEnabled() && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))); @@ -117,7 +118,8 @@ public Option getMetadataWriter() { } } if (isMetadataTableAvailable) { - return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context)); + return Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context, + actionMetadata)); } else { return Option.empty(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java index bd0961d227476..0b0f356f37cd2 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java @@ -337,7 +337,7 @@ public void testTagLocationAndPartitionPathUpdateWithExplicitRollback() throws E public void testSimpleTagLocationAndUpdateWithRollback() throws Exception { // Load to memory HoodieWriteConfig config = getConfigBuilder(100, false, false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); @@ -425,7 +425,7 @@ public void testSimpleTagLocationWithInvalidCommit() throws Exception { public void testEnsureTagLocationUsesCommitTimeline() throws Exception { // Load to memory HoodieWriteConfig config = getConfigBuilder(100, false, false) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build(); SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config); SparkRDDWriteClient writeClient = getHoodieWriteClient(config); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index cdda6ff874aa4..7ea9766170a70 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -459,6 +459,39 @@ public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) thro // Some operations are not feasible with test table infra. hence using write client to test those cases. + /** + * Rollback of the first commit should not trigger bootstrap errors at the metadata table. + */ + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testFirstCommitRollback(HoodieTableType tableType) throws Exception { + init(tableType); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + + try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true))) { + + // Write 1 + String commitTime = "0000001"; + List records = dataGen.generateInserts(commitTime, 20); + client.startCommitWithTime(commitTime); + List writeStatuses = client.insert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + + // Rollback the first commit + client.rollback(commitTime); + + // Write 2 + commitTime = "0000002"; + records = dataGen.generateInserts(commitTime, 10); + client.startCommitWithTime(commitTime); + writeStatuses = client.upsert(jsc.parallelize(records, 1), commitTime).collect(); + assertNoWriteErrors(writeStatuses); + validateMetadata(client); + } + } + + /** * Test several table operations with restore. This test uses SparkRDDWriteClient. * Once the restore support is ready in HoodieTestTable, then rewrite this test.