Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -72,7 +73,12 @@ protected void preExecute() {
writeClient.preWrite(instantTime, getWriteOperationType(), table.getMetaClient());
}

protected abstract Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, boolean arePartitionRecordsSorted);
protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, boolean arePartitionRecordsSorted) {
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
getCommitActionType(), instantTime), Option.empty());
return Option.of(HoodieDatasetBulkInsertHelper
.bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false, getWriteOperationType()));
}

protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
writeClient.postWrite(result, instantTime, table);
Expand Down Expand Up @@ -132,9 +138,7 @@ protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean populateMet
}
}

protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
return Collections.emptyMap();
}
protected abstract Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses);

public String getInstantTime() {
return instantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,15 @@

package org.apache.hudi.commit;

import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.DataSourceInternalWriterHelper;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DatasetBulkInsertCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor {

Expand All @@ -47,47 +35,12 @@ public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
super(config, writeClient);
}

@Override
protected void preExecute() {
instantTime = writeClient.startCommit();
table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime));
}

@Override
protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, boolean arePartitionRecordsSorted) {
Map<String, String> opts = writeConfig.getProps().entrySet().stream().collect(Collectors.toMap(
e -> String.valueOf(e.getKey()),
e -> String.valueOf(e.getValue())));
Map<String, String> optsOverrides = Collections.singletonMap(
HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, String.valueOf(arePartitionRecordsSorted));

String targetFormat;
Map<String, String> customOpts = new HashMap<>(1);
if (HoodieSparkUtils.isSpark3()) {
targetFormat = "org.apache.hudi.spark.internal";
customOpts.put(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key(), records.schema().json());
} else {
throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."
+ " To use row writer please switch to spark 3");
}

records.write().format(targetFormat)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate why logic is customized before for this executor?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate why logic is customized before for this executor?

I think the timeline is like this.
First, there was a normal bulk insert logic, and at that time, the interface of data source v2 was directly used to perform writes.
Later, boneanxs proposed to use bulk insert to perform other operations such as overwrite, but the code path was not integrated at that time. Instead, the logic of this part was retained.
You can refer to: #8076
image

.option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
.options(opts)
.options(customOpts)
.options(optsOverrides)
.mode(SaveMode.Append)
.save();
return Option.empty();
}

@Override
protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
// no op
}

@Override
public WriteOperationType getWriteOperationType() {
return WriteOperationType.BULK_INSERT;
}

protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

package org.apache.hudi.commit;

import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Executor to be used by stream sync. Directly invokes HoodieDatasetBulkInsertHelper.bulkInsert so that WriteStatus is
Expand All @@ -40,16 +38,12 @@ public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig con
super(config, writeClient);
}

@Override
protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, boolean arePartitionRecordsSorted) {
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
getCommitActionType(), instantTime), Option.empty());
return Option.of(HoodieDatasetBulkInsertHelper
.bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false, getWriteOperationType()));
}

@Override
public WriteOperationType getWriteOperationType() {
return WriteOperationType.BULK_INSERT;
}

protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -841,17 +841,9 @@ class HoodieSparkSqlWriterInternal {
val instantTime = executor.getInstantTime

try {
val (writeSuccessful, compactionInstant, clusteringInstant) = mode match {
case _ if overwriteOperationType == null =>
val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema)
(syncHiveSuccess, HOption.empty().asInstanceOf[HOption[String]], HOption.empty().asInstanceOf[HOption[String]])
case _ =>
try {
commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, executor.getCommitActionType, executor.getWriteOperationType), Option.empty)

}
}
val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the change because of overwriteOperationType never null?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image In the previous execution logic, for the bulk insert write path that was not overwrite, `commit` logic was included. Therefore, it only needed to perform `meta sync`. Now I integrate all the bulk insert code paths. Currently, all the `commit` logic is carried out here.

sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc,
TableInstantInfo(basePath, instantTime, executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
(writeSuccessful, HOption.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig)
} finally {
// close the write client in all cases
Expand Down Expand Up @@ -993,11 +985,13 @@ class HoodieSparkSqlWriterInternal {
): (Boolean, HOption[java.lang.String], HOption[java.lang.String]) = {
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) {
log.info("Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
// get extra metadata from props
// 1. properties starting with commit metadata key prefix
// 2. properties related to checkpoint in spark streaming
val extraMetadataOpt = common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
val commitSuccess =
client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
common.util.Option.of(new java.util.HashMap[String, String](metaMap.asJava)),
extraMetadataOpt,
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds,
common.util.Option.ofNullable(extraPreCommitFn.orNull))
Expand All @@ -1012,7 +1006,7 @@ class HoodieSparkSqlWriterInternal {
val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
val compactionInstant: common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
client.scheduleCompaction(common.util.Option.of(new java.util.HashMap[String, String](metaMap.asJava)))
client.scheduleCompaction(extraMetadataOpt)
} else {
common.util.Option.empty()
}
Expand All @@ -1022,7 +1016,7 @@ class HoodieSparkSqlWriterInternal {
val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
val clusteringInstant: common.util.Option[java.lang.String] =
if (asyncClusteringEnabled) {
client.scheduleClustering(common.util.Option.of(new java.util.HashMap[String, String](metaMap.asJava)))
client.scheduleClustering(extraMetadataOpt)
} else {
common.util.Option.empty()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class TestSparkDataSourceDAGExecution extends HoodieSparkClientTestBase with Sca
@CsvSource(Array(
"upsert,org.apache.hudi.client.SparkRDDWriteClient.commit",
"insert,org.apache.hudi.client.SparkRDDWriteClient.commit",
"bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow"))
"bulk_insert,org.apache.hudi.client.SparkRDDWriteClient.commit"))
def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event: String): Unit = {
// register stage event listeners
val stageListener = new StageListener(event)
Expand Down
Loading