diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index e7f69b6e43958..9b4b1d49f113b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -513,7 +513,7 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType, * @param hoodieTable Hoodie Table * @return Write Status */ - protected abstract O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable hoodieTable); + public abstract O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable hoodieTable); /** * Post Commit Hook. Derived classes use this method to perform post-commit processing diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java index 22f7eb80b60eb..797df196441a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieInternalConfig.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.WriteOperationType; /** * Configs/params used for internal purposes. @@ -37,6 +38,14 @@ public class HoodieInternalConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Schema set for row writer/bulk insert."); + public static final ConfigProperty BULKINSERT_OVERWRITE_OPERATION_TYPE = ConfigProperty + .key("hoodie.bulkinsert.overwrite.operation.type") + .noDefaultValue() + .markAdvanced() + .withValidValues(WriteOperationType.INSERT_OVERWRITE_TABLE.value(), WriteOperationType.INSERT_OVERWRITE.value()) + .withDocumentation("For SQL operations, if enables bulk_insert operation, " + + "this configure will take effect to decide overwrite whole table or partitions specified"); + /** * Returns if partition records are sorted or not. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 05f972875a9fc..656f53546c308 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -342,9 +342,9 @@ public void waitForCleaningFinish() { } @Override - protected List postWrite(HoodieWriteMetadata> result, - String instantTime, - HoodieTable hoodieTable) { + public List postWrite(HoodieWriteMetadata> result, + String instantTime, + HoodieTable hoodieTable) { if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis()); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 118685fb29a18..b74ce4213ead2 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -193,9 +193,9 @@ public List delete(List keys, } @Override - protected List postWrite(HoodieWriteMetadata> result, - String instantTime, - HoodieTable hoodieTable) { + public List postWrite(HoodieWriteMetadata> result, + String instantTime, + HoodieTable hoodieTable) { if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index c2a651bf4828e..28e0af788acc3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -256,9 +256,9 @@ public HoodieWriteResult deletePartitions(List partitions, String instan } @Override - protected JavaRDD postWrite(HoodieWriteMetadata> result, - String instantTime, - HoodieTable hoodieTable) { + public JavaRDD postWrite(HoodieWriteMetadata> result, + String instantTime, + HoodieTable hoodieTable) { if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 719b61c8e25c1..eb8c77cc401c3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -29,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.SingleFileHandleCreateFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -77,8 +78,11 @@ public HoodieData performClusteringWithRecordsAsRow(Dataset in // Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value. newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE)); - return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata); + BulkInsertPartitioner> partitioner = getRowPartitioner(strategyParams, schema); + Dataset repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups); + + return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig, + partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index b75c7277deaba..85ee7ec9d4b70 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -69,8 +70,11 @@ public HoodieData performClusteringWithRecordsAsRow(Dataset in newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringMaxBytesInGroup())); - return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata); + BulkInsertPartitioner> partitioner = getRowPartitioner(strategyParams, schema); + Dataset repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups); + + return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig, + partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 3e5875d0b90b8..4bb3ab804abf0 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -147,13 +147,10 @@ object HoodieDatasetBulkInsertHelper instantTime: String, table: HoodieTable[_, _, _, _], writeConfig: HoodieWriteConfig, - partitioner: BulkInsertPartitioner[Dataset[Row]], - parallelism: Int, + arePartitionRecordsSorted: Boolean, shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = { - val repartitionedDataset = partitioner.repartitionRecords(dataset, parallelism) - val arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted val schema = dataset.schema - val writeStatuses = repartitionedDataset.queryExecution.toRdd.mapPartitions(iter => { + val writeStatuses = dataset.queryExecution.toRdd.mapPartitions(iter => { val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get val taskId = taskContextSupplier.getStageIdSupplier.get.toLong diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java new file mode 100644 index 0000000000000..4c0f6fa626781 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commit; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.HoodieDatasetBulkInsertHelper; +import org.apache.hudi.client.HoodieWriteResult; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.execution.bulkinsert.BucketIndexBulkInsertPartitionerWithRows; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory; +import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public abstract class BaseDatasetBulkInsertCommitActionExecutor implements Serializable { + + protected final transient HoodieWriteConfig writeConfig; + protected final transient SparkRDDWriteClient writeClient; + protected final String instantTime; + protected HoodieTable table; + + public BaseDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, + SparkRDDWriteClient writeClient, + String instantTime) { + this.writeConfig = config; + this.writeClient = writeClient; + this.instantTime = instantTime; + } + + protected void preExecute() { + table.validateInsertSchema(); + writeClient.startCommitWithTime(instantTime, getCommitActionType()); + writeClient.preWrite(instantTime, getWriteOperationType(), table.getMetaClient()); + } + + protected abstract Option> doExecute(Dataset records, boolean arePartitionRecordsSorted); + + protected void afterExecute(HoodieWriteMetadata> result) { + writeClient.postWrite(result, instantTime, table); + } + + private HoodieWriteMetadata> buildHoodieWriteMetadata(Option> writeStatuses) { + return writeStatuses.map(statuses -> { + HoodieWriteMetadata> hoodieWriteMetadata = new HoodieWriteMetadata<>(); + hoodieWriteMetadata.setWriteStatuses(HoodieJavaRDD.getJavaRDD(statuses)); + hoodieWriteMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses)); + return hoodieWriteMetadata; + }).orElse(new HoodieWriteMetadata<>()); + } + + public final HoodieWriteResult execute(Dataset records, boolean isTablePartitioned) { + if (writeConfig.getBoolean(DataSourceWriteOptions.INSERT_DROP_DUPS())) { + throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet"); + } + + boolean populateMetaFields = writeConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS); + + BulkInsertPartitioner> bulkInsertPartitionerRows = getPartitioner(populateMetaFields, isTablePartitioned); + boolean shouldDropPartitionColumns = writeConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS()); + Dataset hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns, instantTime); + + table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime)); + preExecute(); + HoodieWriteMetadata> result = buildHoodieWriteMetadata(doExecute(hoodieDF, bulkInsertPartitionerRows.arePartitionRecordsSorted())); + afterExecute(result); + + return new HoodieWriteResult(result.getWriteStatuses(), result.getPartitionToReplaceFileIds()); + } + + public abstract WriteOperationType getWriteOperationType(); + + public String getCommitActionType() { + return CommitUtils.getCommitActionType(getWriteOperationType(), writeClient.getConfig().getTableType()); + } + + protected BulkInsertPartitioner> getPartitioner(boolean populateMetaFields, boolean isTablePartitioned) { + if (populateMetaFields) { + if (writeConfig.getIndexType() == HoodieIndex.IndexType.BUCKET) { + return new BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault(), + writeConfig.getBucketIndexNumBuckets()); + } else { + return DataSourceUtils + .createUserDefinedBulkInsertPartitionerWithRows(writeConfig) + .orElseGet(() -> BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned)); + } + } else { + // Sort modes are not yet supported when meta fields are disabled + return new NonSortPartitionerWithRows(); + } + } + + protected Map> getPartitionToReplacedFileIds(HoodieData writeStatuses) { + return Collections.emptyMap(); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java new file mode 100644 index 0000000000000..4ebdce2e28309 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commit; + +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieInternalConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.DataSourceInternalWriterHelper; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class DatasetBulkInsertCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor { + + public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, + SparkRDDWriteClient writeClient, + String instantTime) { + super(config, writeClient, instantTime); + } + + @Override + protected void preExecute() { + // no op + } + + @Override + protected Option> doExecute(Dataset records, boolean arePartitionRecordsSorted) { + Map opts = writeConfig.getProps().entrySet().stream().collect(Collectors.toMap( + e -> String.valueOf(e.getKey()), + e -> String.valueOf(e.getValue()))); + Map optsOverrides = Collections.singletonMap( + HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, String.valueOf(arePartitionRecordsSorted)); + + String targetFormat; + Map customOpts = new HashMap<>(1); + if (HoodieSparkUtils.isSpark2()) { + targetFormat = "org.apache.hudi.internal"; + } else if (HoodieSparkUtils.isSpark3()) { + targetFormat = "org.apache.hudi.spark3.internal"; + customOpts.put(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key(), records.schema().json()); + } else { + throw new HoodieException("Bulk insert using row writer is not supported with current Spark version." + + " To use row writer please switch to spark 2 or spark 3"); + } + + records.write().format(targetFormat) + .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) + .options(opts) + .options(customOpts) + .options(optsOverrides) + .mode(SaveMode.Append) + .save(); + return Option.empty(); + } + + @Override + protected void afterExecute(HoodieWriteMetadata> result) { + // no op + } + + @Override + public WriteOperationType getWriteOperationType() { + return WriteOperationType.BULK_INSERT; + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java new file mode 100644 index 0000000000000..a9f14d1e3e402 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commit; + +import org.apache.hudi.HoodieDatasetBulkInsertHelper; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaPairRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DatasetBulkInsertOverwriteCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor { + + public DatasetBulkInsertOverwriteCommitActionExecutor(HoodieWriteConfig config, + SparkRDDWriteClient writeClient, + String instantTime) { + super(config, writeClient, instantTime); + } + + @Override + protected Option> doExecute(Dataset records, boolean arePartitionRecordsSorted) { + table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, getCommitActionType(), instantTime), Option.empty()); + return Option.of(HoodieDatasetBulkInsertHelper + .bulkInsert(records, instantTime, table, writeConfig, arePartitionRecordsSorted, false)); + } + + @Override + public WriteOperationType getWriteOperationType() { + return WriteOperationType.INSERT_OVERWRITE; + } + + @Override + protected Map> getPartitionToReplacedFileIds(HoodieData writeStatuses) { + return HoodieJavaPairRDD.getJavaPairRDD(writeStatuses.map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath -> + Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); + } + + protected List getAllExistingFileIds(String partitionPath) { + // because new commit is not complete. it is safe to mark all existing file Ids as old files + return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList()); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java new file mode 100644 index 0000000000000..ad17abd294a76 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.commit; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaPairRDD; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class DatasetBulkInsertOverwriteTableCommitActionExecutor extends DatasetBulkInsertOverwriteCommitActionExecutor { + + public DatasetBulkInsertOverwriteTableCommitActionExecutor(HoodieWriteConfig config, + SparkRDDWriteClient writeClient, + String instantTime) { + super(config, writeClient, instantTime); + } + + @Override + public WriteOperationType getWriteOperationType() { + return WriteOperationType.INSERT_OVERWRITE_TABLE; + } + + @Override + protected Map> getPartitionToReplacedFileIds(HoodieData writeStatuses) { + HoodieEngineContext context = writeClient.getEngineContext(); + List partitionPaths = FSUtils.getAllPartitionPaths(context, writeConfig.getMetadataConfig(), + table.getMetaClient().getBasePathV2().toString()); + + if (partitionPaths == null || partitionPaths.isEmpty()) { + return Collections.emptyMap(); + } + + context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions"); + return HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, partitionPaths.size()).mapToPair( + partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap(); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 6756a484e0c17..2c0e95eabf2d5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -22,8 +22,9 @@ import org.apache.avro.generic.GenericData import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys -import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace} +import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace} import org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig +import org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieWriterUtils._ @@ -32,6 +33,7 @@ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} import org.apache.hudi.common.config._ import org.apache.hudi.common.engine.HoodieEngineContext import org.apache.hudi.common.fs.FSUtils @@ -44,10 +46,7 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, Option => HOption} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} -import org.apache.hudi.execution.bulkinsert.{BucketIndexBulkInsertPartitionerWithRows, BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows} import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} -import org.apache.hudi.index.HoodieIndex.IndexType -import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils.reconcileNullability @@ -59,7 +58,6 @@ import org.apache.hudi.metrics.Metrics import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.sync.common.util.SyncUtilHelpers.getHoodieMetaSyncException -import org.apache.hudi.table.BulkInsertPartitioner import org.apache.hudi.util.SparkKeyGenUtils import org.apache.spark.api.java.{JavaSparkContext} import org.apache.spark.sql._ @@ -322,15 +320,6 @@ object HoodieSparkSqlWriter { validateSchemaForHoodieIsDeleted(writerSchema) mayBeValidateParamsForAutoGenerationOfRecordKeys(parameters, hoodieConfig) - // Short-circuit if bulk_insert via row is enabled. - // scalastyle:off - if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { - val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName, - basePath, path, instantTime, writerSchema, tableConfig.isTablePartitioned) - return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) - } - // scalastyle:on - // Check whether partition columns should be persisted w/in the data-files, or should // be instead omitted from them and simply encoded into the partition path (which is Spark's // behavior by default) @@ -360,14 +349,6 @@ object HoodieSparkSqlWriter { // TODO(HUDI-4772) proper writer-schema has to be specified here DataSourceUtils.createHoodieClient(jsc, processedDataSchema.toString, path, tblName, mapAsJavaMap(finalOpts)) } - val writeConfig = client.getConfig - if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { - throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.") - } - // Convert to RDD[HoodieRecord] - val hoodieRecords = - HoodieCreateRecordUtils.createHoodieRecordRdd(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, - processedDataSchema, operation, instantTime, isPrepped) if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { asyncCompactionTriggerFn.get.apply(client) @@ -377,6 +358,23 @@ object HoodieSparkSqlWriter { asyncClusteringTriggerFn.get.apply(client) } + // Short-circuit if bulk_insert via row is enabled. + // scalastyle:off + if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { + return bulkInsertAsRow(client, parameters, hoodieConfig, df, mode, tblName, basePath, + instantTime, writerSchema, tableConfig) + } + // scalastyle:on + + val writeConfig = client.getConfig + if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) { + throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.") + } + // Convert to RDD[HoodieRecord] + val hoodieRecords = + HoodieCreateRecordUtils.createHoodieRecordRdd(df, writeConfig, parameters, avroRecordName, avroRecordNamespace, writerSchema, + processedDataSchema, operation, instantTime, isPrepped) + val dedupedHoodieRecords = if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { DataSourceUtils.dropDuplicates(jsc, hoodieRecords, mapAsJavaMap(parameters)) @@ -792,71 +790,72 @@ object HoodieSparkSqlWriter { } } - def bulkInsertAsRow(sqlContext: SQLContext, + def bulkInsertAsRow(writeClient: SparkRDDWriteClient[_], + parameters: Map[String, String], hoodieConfig: HoodieConfig, df: DataFrame, + mode: SaveMode, tblName: String, basePath: Path, - path: String, instantTime: String, writerSchema: Schema, - isTablePartitioned: Boolean): (Boolean, common.util.Option[String]) = { + tableConfig: HoodieTableConfig): + (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet") } + val sqlContext = writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext + val jsc = writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext val writerSchemaStr = writerSchema.toString - val opts = hoodieConfig.getProps.toMap ++ + // Make opts mutable since it could be modified by tryOverrideParquetWriteLegacyFormatProperty + val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr) - val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, path, tblName, mapAsJavaMap(opts)) - val populateMetaFields = hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS) - - val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) { - if (writeConfig.getIndexType == IndexType.BUCKET) { - new BucketIndexBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault, writeConfig.getBucketIndexNumBuckets) - } else { - val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) - if (userDefinedBulkInsertPartitionerOpt.isPresent) { - userDefinedBulkInsertPartitionerOpt.get - } else { - BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned) - } - } - } else { - // Sort modes are not yet supported when meta fields are disabled - new NonSortPartitionerWithRows() + // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" + tryOverrideParquetWriteLegacyFormatProperty(opts, convertAvroSchemaToStructType(writerSchema)) + val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath.toString, tblName, opts) + val overwriteOperationType = Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE)) + .map(WriteOperationType.fromValue) + .orNull + val executor = mode match { + case _ if overwriteOperationType == null => + // Don't need to overwrite + new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient, instantTime) + case SaveMode.Append if overwriteOperationType == WriteOperationType.INSERT_OVERWRITE => + // INSERT OVERWRITE PARTITION uses Append mode + new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, writeClient, instantTime) + case SaveMode.Overwrite if overwriteOperationType == WriteOperationType.INSERT_OVERWRITE_TABLE => + new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, writeClient, instantTime) + case _ => + throw new HoodieException(s"$mode with bulk_insert in row writer path is not supported yet"); } - val shouldDropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) - val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns, - instantTime) - - val optsOverrides = Map( - HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED -> - bulkInsertPartitionerRows.arePartitionRecordsSorted().toString - ) - - val (targetFormat, customOpts) = if (HoodieSparkUtils.isSpark2) { - ("org.apache.hudi.internal", Map()) - } else if (HoodieSparkUtils.isSpark3) { - ("org.apache.hudi.spark3.internal", Map( - HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key -> hoodieDF.schema.json - )) - } else { - throw new HoodieException("Bulk insert using row writer is not supported with current Spark version." - + " To use row writer please switch to spark 2 or spark 3") - } + val writeResult = executor.execute(df, tableConfig.isTablePartitioned) - hoodieDF.write.format(targetFormat) - .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime) - .options(opts ++ customOpts ++ optsOverrides) - .mode(SaveMode.Append) - .save() + try { + val (writeSuccessful, compactionInstant, clusteringInstant) = mode match { + case _ if overwriteOperationType == null => + val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema) + (syncHiveSuccess, HOption.empty().asInstanceOf[HOption[String]], HOption.empty().asInstanceOf[HOption[String]]) + case _ => + try { + commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, writeResult, parameters, writeClient, tableConfig, jsc, + TableInstantInfo(basePath, instantTime, executor.getCommitActionType, executor.getWriteOperationType), Option.empty) - val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema) - (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) + } + } + (writeSuccessful, HOption.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) + } finally { + // close the write client in all cases + val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, tableConfig, parameters, jsc.hadoopConfiguration()) + val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, parameters) + if (!asyncCompactionEnabled && !asyncClusteringEnabled) { + log.info("Closing write client") + writeClient.close() + } + } } def cleanup() : Unit = { @@ -880,6 +879,7 @@ object HoodieSparkSqlWriter { } else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { // When user set operation as INSERT_OVERWRITE_TABLE, // overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation + // TODO HUDI-6286 should not delete old data if using `Overwrite` mode log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.") fs.delete(tablePath, true) tableExists = false @@ -971,7 +971,7 @@ object HoodieSparkSqlWriter { jsc: JavaSparkContext, tableInstantInfo: TableInstantInfo, extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] - ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = { + ): (Boolean, HOption[java.lang.String], HOption[java.lang.String]) = { if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) { log.info("Proceeding to commit the write.") val metaMap = parameters.filter(kv => diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 88ff9f304de2d..b68bad4d0f59e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -24,7 +24,7 @@ import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedPropertie import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME -import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncConfigHolder, MultiPartKeysValueExtractor} import org.apache.hudi.keygen.ComplexKeyGenerator @@ -121,21 +121,23 @@ trait ProvidesHoodieConfig extends Logging { (enableBulkInsert, isOverwritePartition, isOverwriteTable, dropDuplicate, isNonStrictMode, isPartitionedTable) match { case (true, _, _, _, false, _) => throw new IllegalArgumentException(s"Table with primaryKey can only use bulk insert in non-strict mode.") - case (true, true, _, _, _, true) => - throw new IllegalArgumentException(s"Insert Overwrite Partition can not use bulk insert.") case (true, _, _, true, _, _) => throw new IllegalArgumentException(s"Bulk insert cannot support drop duplication." + s" Please disable $INSERT_DROP_DUPS and try again.") - // if enableBulkInsert is true, use bulk insert for the insert overwrite non-partitioned table. - case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL + // Bulk insert with overwrite table + case (true, false, true, _, _, _) => + BULK_INSERT_OPERATION_OPT_VAL + // Bulk insert with overwrite table partition + case (true, true, false, _, _, true) => + BULK_INSERT_OPERATION_OPT_VAL // insert overwrite table case (false, false, true, _, _, _) => INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL // insert overwrite partition - case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL + case (false, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL // disable dropDuplicate, and provide preCombineKey, use the upsert operation for strict and upsert mode. case (false, false, false, false, false, _) if hasPrecombineColumn => UPSERT_OPERATION_OPT_VAL // if table is pk table and has enableBulkInsert use bulk insert for non-strict mode. - case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL + case (true, false, false, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL // for the rest case, use the insert operation case _ => INSERT_OPERATION_OPT_VAL } @@ -234,6 +236,17 @@ trait ProvidesHoodieConfig extends Logging { null } + val overwriteTableOpts = if (operation.equals(BULK_INSERT_OPERATION_OPT_VAL)) { + if (isOverwriteTable) { + Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE_TABLE.value()) + } else if (isOverwritePartition) { + Map(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key -> WriteOperationType.INSERT_OVERWRITE.value()) + } else { + Map() + } + } else { + Map() + } val overridingOpts = extraOptions ++ Map( "path" -> path, TABLE_TYPE.key -> tableType, @@ -244,7 +257,7 @@ trait ProvidesHoodieConfig extends Logging { RECORDKEY_FIELD.key -> recordKeyConfigValue, PRECOMBINE_FIELD.key -> preCombineField, PARTITIONPATH_FIELD.key -> partitionFieldsStr - ) + ) ++ overwriteTableOpts combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, defaultOpts = defaultOpts, overridingOpts = overridingOpts) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index c05a4055447f7..687bcaf0892e5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -193,6 +193,14 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { } } + protected def withTable(tableName: String)(f: String => Unit): Unit = { + try { + f(tableName) + } finally { + spark.sql(s"drop table if exists $tableName") + } + } + protected def withRecordType(recordConfig: Map[HoodieRecordType, Map[String, String]]=Map.empty)(f: => Unit) { // TODO HUDI-5264 Test parquet log with avro record in spark sql test Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK).foreach { recordType => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index a9fd0a4a03047..b51348679e5b0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -568,23 +568,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase { checkException(s"insert into $tableName2 values(1, 'a1', 10, 1000)")( "Table with primaryKey can only use bulk insert in non-strict mode." ) - - spark.sql("set hoodie.sql.insert.mode = non-strict") - val tableName3 = generateTableName - spark.sql( - s""" - |create table $tableName3 ( - | id int, - | name string, - | price double, - | dt string - |) using hudi - | tblproperties (primaryKey = 'id') - | partitioned by (dt) - """.stripMargin) - checkException(s"insert overwrite table $tableName3 partition(dt = '2021-07-18') values(1, 'a1', 10, '2021-07-18')")( - "Insert Overwrite Partition can not use bulk insert." - ) } } } @@ -616,138 +599,271 @@ class TestInsertTable extends HoodieSparkSqlTestBase { } } - test("Test bulk insert") { + test("Test bulk insert with insert into for single partitioned table") { withSQLConf("hoodie.sql.insert.mode" -> "non-strict") { withRecordType()(withTempDir { tmp => Seq("cow", "mor").foreach {tableType => - // Test bulk insert for single partition - val tableName = generateTableName - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | dt string - |) using hudi - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id' - | ) - | partitioned by (dt) - | location '${tmp.getCanonicalPath}/$tableName' - """.stripMargin) - spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false") + withTable(generateTableName) { tableName => + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$tableName' + """.stripMargin) + spark.sql("set hoodie.datasource.write.insert.drop.duplicates = false") + + // Enable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')") + + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + } + checkAnswer(s"select id, name, price, dt from $tableName")( + Seq(1, "a1", 10.0, "2021-07-18") + ) - // Enable the bulk insert - spark.sql("set hoodie.sql.bulk.insert.enable = true") - spark.sql(s"insert into $tableName values(1, 'a1', 10, '2021-07-18')") + // Disable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = false") + spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')") - assertResult(WriteOperationType.BULK_INSERT) { - getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + assertResult(WriteOperationType.INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + } + checkAnswer(s"select id, name, price, dt from $tableName order by id")( + Seq(1, "a1", 10.0, "2021-07-18"), + Seq(2, "a2", 10.0, "2021-07-18") + ) } - checkAnswer(s"select id, name, price, dt from $tableName")( - Seq(1, "a1", 10.0, "2021-07-18") - ) + } + }) + } + } + + test("Test bulk insert with insert into for multi partitioned table") { + withSQLConf("hoodie.sql.insert.mode" -> "non-strict") { + withRecordType()(withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { tableMultiPartition => + spark.sql( + s""" + |create table $tableMultiPartition ( + | id int, + | name string, + | price double, + | dt string, + | hh string + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by (dt, hh) + | location '${tmp.getCanonicalPath}/$tableMultiPartition' + """.stripMargin) - // Disable the bulk insert - spark.sql("set hoodie.sql.bulk.insert.enable = false") - spark.sql(s"insert into $tableName values(2, 'a2', 10, '2021-07-18')") + // Enable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = true") + spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, '2021-07-18', '12')") - assertResult(WriteOperationType.INSERT) { - getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableName").getOperationType + checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition")( + Seq(1, "a1", 10.0, "2021-07-18", "12") + ) + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableMultiPartition").getOperationType + } + // Disable the bulk insert + spark.sql("set hoodie.sql.bulk.insert.enable = false") + spark.sql(s"insert into $tableMultiPartition " + + s"values(2, 'a2', 10, '2021-07-18','12')") + + checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition order by id")( + Seq(1, "a1", 10.0, "2021-07-18", "12"), + Seq(2, "a2", 10.0, "2021-07-18", "12") + ) + assertResult(WriteOperationType.INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$tableMultiPartition").getOperationType + } } - checkAnswer(s"select id, name, price, dt from $tableName order by id")( - Seq(1, "a1", 10.0, "2021-07-18"), - Seq(2, "a2", 10.0, "2021-07-18") - ) + } + }) + } + } - // Test bulk insert for multi-level partition - val tableMultiPartition = generateTableName - spark.sql( - s""" - |create table $tableMultiPartition ( - | id int, - | name string, - | price double, - | dt string, - | hh string - |) using hudi - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id' - | ) - | partitioned by (dt, hh) - | location '${tmp.getCanonicalPath}/$tableMultiPartition' - """.stripMargin) + test("Test bulk insert with insert into for non partitioned table") { + withSQLConf("hoodie.sql.insert.mode" -> "non-strict", + "hoodie.sql.bulk.insert.enable" -> "true") { + withRecordType()(withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { nonPartitionedTable => + spark.sql( + s""" + |create table $nonPartitionedTable ( + | id int, + | name string, + | price double + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | location '${tmp.getCanonicalPath}/$nonPartitionedTable' + """.stripMargin) + spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)") + checkAnswer(s"select id, name, price from $nonPartitionedTable")( + Seq(1, "a1", 10.0) + ) + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$nonPartitionedTable").getOperationType + } + } + } + }) + } + } - // Enable the bulk insert - spark.sql("set hoodie.sql.bulk.insert.enable = true") - spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, '2021-07-18', '12')") + test("Test bulk insert with CTAS") { + withSQLConf("hoodie.sql.insert.mode" -> "non-strict", + "hoodie.sql.bulk.insert.enable" -> "true") { + withRecordType()(withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { inputTable => + spark.sql( + s""" + |create table $inputTable ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$inputTable' + """.stripMargin) + spark.sql(s"insert into $inputTable values(1, 'a1', 10, '2021-07-18')") + + withTable(generateTableName) { target => + spark.sql( + s""" + |create table $target + |using hudi + |tblproperties( + | type = '$tableType', + | primaryKey = 'id' + |) + | location '${tmp.getCanonicalPath}/$target' + | as + | select * from $inputTable + |""".stripMargin) + checkAnswer(s"select id, name, price, dt from $target order by id")( + Seq(1, "a1", 10.0, "2021-07-18") + ) + assertResult(WriteOperationType.BULK_INSERT) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$target").getOperationType + } + } + } + } + }) + } + } - checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition")( - Seq(1, "a1", 10.0, "2021-07-18", "12") - ) - // Disable the bulk insert - spark.sql("set hoodie.sql.bulk.insert.enable = false") - spark.sql(s"insert into $tableMultiPartition " + - s"values(2, 'a2', 10, '2021-07-18','12')") - - checkAnswer(s"select id, name, price, dt, hh from $tableMultiPartition order by id")( - Seq(1, "a1", 10.0, "2021-07-18", "12"), - Seq(2, "a2", 10.0, "2021-07-18", "12") - ) - // Test bulk insert for non-partitioned table - val nonPartitionedTable = generateTableName - spark.sql( - s""" - |create table $nonPartitionedTable ( - | id int, - | name string, - | price double - |) using hudi - | tblproperties ( - | type = '$tableType', - | primaryKey = 'id' - | ) - | location '${tmp.getCanonicalPath}/$nonPartitionedTable' - """.stripMargin) - spark.sql("set hoodie.sql.bulk.insert.enable = true") - spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)") - checkAnswer(s"select id, name, price from $nonPartitionedTable")( - Seq(1, "a1", 10.0) - ) - spark.sql(s"insert overwrite table $nonPartitionedTable values(2, 'a2', 10)") - checkAnswer(s"select id, name, price from $nonPartitionedTable")( - Seq(2, "a2", 10.0) - ) - spark.sql("set hoodie.sql.bulk.insert.enable = false") + test("Test bulk insert with insert overwrite table") { + withSQLConf("hoodie.sql.insert.mode" -> "non-strict", + "hoodie.sql.bulk.insert.enable" -> "true") { + withRecordType()(withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { nonPartitionedTable => + spark.sql( + s""" + |create table $nonPartitionedTable ( + | id int, + | name string, + | price double + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | location '${tmp.getCanonicalPath}/$nonPartitionedTable' + """.stripMargin) + spark.sql(s"insert into $nonPartitionedTable values(1, 'a1', 10)") - // Test CTAS for bulk insert - val tableName2 = generateTableName - spark.sql( - s""" - |create table $tableName2 - |using hudi - |tblproperties( - | type = '$tableType', - | primaryKey = 'id' - |) - | location '${tmp.getCanonicalPath}/$tableName2' - | as - | select * from $tableName - |""".stripMargin) - checkAnswer(s"select id, name, price, dt from $tableName2 order by id")( - Seq(1, "a1", 10.0, "2021-07-18"), - Seq(2, "a2", 10.0, "2021-07-18") - ) + spark.sql(s"insert overwrite table $nonPartitionedTable values(2, 'b1', 11)") + checkAnswer(s"select id, name, price from $nonPartitionedTable order by id")( + Seq(2, "b1", 11.0) + ) + assertResult(WriteOperationType.INSERT_OVERWRITE_TABLE) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$nonPartitionedTable").getOperationType + } + } + } + }) + } + } + + test("Test bulk insert with insert overwrite partition") { + withSQLConf("hoodie.sql.insert.mode" -> "non-strict", + "hoodie.sql.bulk.insert.enable" -> "true") { + withRecordType()(withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + withTable(generateTableName) { partitionedTable => + spark.sql( + s""" + |create table $partitionedTable ( + | id int, + | name string, + | price double, + | dt string + |) using hudi + | tblproperties ( + | type = '$tableType', + | primaryKey = 'id' + | ) + | partitioned by (dt) + | location '${tmp.getCanonicalPath}/$partitionedTable' + """.stripMargin) + spark.sql(s"insert into $partitionedTable values(3, 'c1', 13, '2021-07-17')") + spark.sql(s"insert into $partitionedTable values(1, 'a1', 10, '2021-07-18')") + + // Insert overwrite a partition + spark.sql(s"insert overwrite table $partitionedTable partition(dt='2021-07-17') values(2, 'b1', 11)") + checkAnswer(s"select id, name, price, dt from $partitionedTable order by id")( + Seq(1, "a1", 10.0, "2021-07-18"), + Seq(2, "b1", 11.0, "2021-07-17") + ) + assertResult(WriteOperationType.INSERT_OVERWRITE) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$partitionedTable").getOperationType + } + + // Insert overwrite whole table + spark.sql(s"insert overwrite table $partitionedTable values(4, 'd1', 14, '2021-07-19')") + checkAnswer(s"select id, name, price, dt from $partitionedTable order by id")( + Seq(4, "d1", 14.0, "2021-07-19") + ) + assertResult(WriteOperationType.INSERT_OVERWRITE_TABLE) { + getLastCommitMetadata(spark, s"${tmp.getCanonicalPath}/$partitionedTable").getOperationType + } + } } }) } } test("Test combine before insert") { - withSQLConf("set hoodie.sql.bulk.insert.enable" -> "false") { + withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") { withRecordType()(withTempDir{tmp => val tableName = generateTableName spark.sql( diff --git a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala index 2ec000ee9804a..c16b8cae2f446 100644 --- a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala +++ b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala @@ -106,8 +106,14 @@ private class HoodieV1WriteBuilder(writeOptions: CaseInsensitiveStringMap, override def toInsertableRelation: InsertableRelation = { new InsertableRelation { override def insert(data: DataFrame, overwrite: Boolean): Unit = { + val mode = if (overwriteTable || overwritePartition) { + SaveMode.Overwrite + } else { + SaveMode.Append + } + data.write.format("org.apache.hudi") - .mode(SaveMode.Append) + .mode(mode) .options(buildHoodieConfig(hoodieCatalogTable) ++ buildHoodieInsertConfig(hoodieCatalogTable, spark, overwritePartition, overwriteTable, Map.empty, Map.empty)) .save()