diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index acdc139cf7c58..3b68659abf798 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -72,7 +73,12 @@ protected void preExecute() { writeClient.preWrite(instantTime, getWriteOperationType(), table.getMetaClient()); } - protected abstract Option> doExecute(Dataset records, boolean arePartitionRecordsSorted); + protected Option> doExecute(Dataset records, boolean arePartitionRecordsSorted) { + table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, + getCommitActionType(), instantTime), Option.empty()); + return Option.of(HoodieDatasetBulkInsertHelper + .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false, getWriteOperationType())); + } protected void afterExecute(HoodieWriteMetadata> result) { writeClient.postWrite(result, instantTime, table); @@ -132,9 +138,7 @@ protected BulkInsertPartitioner> getPartitioner(boolean populateMet } } - protected Map> getPartitionToReplacedFileIds(HoodieData writeStatuses) { - return Collections.emptyMap(); - } + protected abstract Map> getPartitionToReplacedFileIds(HoodieData writeStatuses); public String getInstantTime() { return instantTime; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java index 62d89b2c0341c..8b124de565ca1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java @@ -18,27 +18,15 @@ package org.apache.hudi.commit; -import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieInternalConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.internal.DataSourceInternalWriterHelper; -import org.apache.hudi.table.action.HoodieWriteMetadata; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; import java.util.Collections; -import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class DatasetBulkInsertCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor { @@ -47,47 +35,12 @@ public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, super(config, writeClient); } - @Override - protected void preExecute() { - instantTime = writeClient.startCommit(); - table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime)); - } - - @Override - protected Option> doExecute(Dataset records, boolean arePartitionRecordsSorted) { - Map opts = writeConfig.getProps().entrySet().stream().collect(Collectors.toMap( - e -> String.valueOf(e.getKey()), - e -> String.valueOf(e.getValue()))); - Map optsOverrides = Collections.singletonMap( - HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, String.valueOf(arePartitionRecordsSorted)); - - String targetFormat; - Map customOpts = new HashMap<>(1); - if (HoodieSparkUtils.isSpark3()) { - targetFormat = "org.apache.hudi.spark.internal"; - customOpts.put(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key(), records.schema().json()); - } else { - throw new HoodieException("Bulk insert using row writer is not supported with current Spark version." - + " To use row writer please switch to spark 3"); - } - - records.write().format(targetFormat) - .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) - .options(opts) - .options(customOpts) - .options(optsOverrides) - .mode(SaveMode.Append) - .save(); - return Option.empty(); - } - - @Override - protected void afterExecute(HoodieWriteMetadata> result) { - // no op - } - @Override public WriteOperationType getWriteOperationType() { return WriteOperationType.BULK_INSERT; } + + protected Map> getPartitionToReplacedFileIds(HoodieData writeStatuses) { + return Collections.emptyMap(); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java index f5f5d47e6cc34..8e183629f48ba 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java @@ -18,17 +18,15 @@ package org.apache.hudi.commit; -import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** * Executor to be used by stream sync. Directly invokes HoodieDatasetBulkInsertHelper.bulkInsert so that WriteStatus is @@ -40,16 +38,12 @@ public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig con super(config, writeClient); } - @Override - protected Option> doExecute(Dataset records, boolean arePartitionRecordsSorted) { - table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED, - getCommitActionType(), instantTime), Option.empty()); - return Option.of(HoodieDatasetBulkInsertHelper - .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false, getWriteOperationType())); - } - @Override public WriteOperationType getWriteOperationType() { return WriteOperationType.BULK_INSERT; } + + protected Map> getPartitionToReplacedFileIds(HoodieData writeStatuses) { + return Collections.emptyMap(); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6c0ad0c8ac049..e6588053425be 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -841,17 +841,9 @@ class HoodieSparkSqlWriterInternal { val instantTime = executor.getInstantTime try { - val (writeSuccessful, compactionInstant, clusteringInstant) = mode match { - case _ if overwriteOperationType == null => - val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema) - (syncHiveSuccess, HOption.empty().asInstanceOf[HOption[String]], HOption.empty().asInstanceOf[HOption[String]]) - case _ => - try { - commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, - TableInstantInfo(basePath, instantTime, executor.getCommitActionType, executor.getWriteOperationType), Option.empty) - - } - } + val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations( + sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, + TableInstantInfo(basePath, instantTime, executor.getCommitActionType, executor.getWriteOperationType), Option.empty) (writeSuccessful, HOption.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) } finally { // close the write client in all cases @@ -993,11 +985,13 @@ class HoodieSparkSqlWriterInternal { ): (Boolean, HOption[java.lang.String], HOption[java.lang.String]) = { if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) { log.info("Proceeding to commit the write.") - val metaMap = parameters.filter(kv => - kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key))) + // get extra metadata from props + // 1. properties starting with commit metadata key prefix + // 2. properties related to checkpoint in spark streaming + val extraMetadataOpt = common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava)) val commitSuccess = client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses, - common.util.Option.of(new java.util.HashMap[String, String](metaMap.asJava)), + extraMetadataOpt, tableInstantInfo.commitActionType, writeResult.getPartitionToReplaceFileIds, common.util.Option.ofNullable(extraPreCommitFn.orNull)) @@ -1012,7 +1006,7 @@ class HoodieSparkSqlWriterInternal { val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration()) val compactionInstant: common.util.Option[java.lang.String] = if (asyncCompactionEnabled) { - client.scheduleCompaction(common.util.Option.of(new java.util.HashMap[String, String](metaMap.asJava))) + client.scheduleCompaction(extraMetadataOpt) } else { common.util.Option.empty() } @@ -1022,7 +1016,7 @@ class HoodieSparkSqlWriterInternal { val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters) val clusteringInstant: common.util.Option[java.lang.String] = if (asyncClusteringEnabled) { - client.scheduleClustering(common.util.Option.of(new java.util.HashMap[String, String](metaMap.asJava))) + client.scheduleClustering(extraMetadataOpt) } else { common.util.Option.empty() } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala index 98b4be1393b38..0255b22c77e92 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala @@ -93,7 +93,7 @@ class TestSparkDataSourceDAGExecution extends HoodieSparkClientTestBase with Sca @CsvSource(Array( "upsert,org.apache.hudi.client.SparkRDDWriteClient.commit", "insert,org.apache.hudi.client.SparkRDDWriteClient.commit", - "bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow")) + "bulk_insert,org.apache.hudi.client.SparkRDDWriteClient.commit")) def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event: String): Unit = { // register stage event listeners val stageListener = new StageListener(event)