From 3be9d9ab07feb335ecfc599c21848cbf28286252 Mon Sep 17 00:00:00 2001 From: xiaoyuwei Date: Fri, 24 Dec 2021 10:38:25 +0800 Subject: [PATCH 1/3] [HUDI-3085] improve bulk insert partitioner abstraction, embedding the handling of fileIdPfx & write handle factory into partitioner --- .../hudi/table/BulkInsertPartitioner.java | 48 +++++++++++++++++-- .../action/commit/BaseBulkInsertHelper.java | 6 +-- .../run/strategy/JavaExecutionStrategy.java | 11 +++-- .../JavaSortAndSizeExecutionStrategy.java | 7 ++- .../JavaCustomColumnsSortPartitioner.java | 3 +- .../bulkinsert/JavaGlobalSortPartitioner.java | 3 +- .../bulkinsert/JavaNonSortPartitioner.java | 3 +- .../action/commit/JavaBulkInsertHelper.java | 22 ++++----- .../MultipleSparkJobExecutionStrategy.java | 5 +- .../SparkSingleFileSortExecutionStrategy.java | 7 ++- .../SparkSortAndSizeExecutionStrategy.java | 7 ++- .../bulkinsert/BulkInsertMapFunction.java | 15 +++--- .../bulkinsert/GlobalSortPartitioner.java | 3 +- .../GlobalSortPartitionerWithRows.java | 3 +- .../bulkinsert/NonSortPartitioner.java | 3 +- .../NonSortPartitionerWithRows.java | 3 +- .../PartitionSortPartitionerWithRows.java | 3 +- .../RDDCustomColumnsSortPartitioner.java | 3 +- .../RDDPartitionSortPartitioner.java | 3 +- .../RDDSpatialCurveSortPartitioner.java | 15 +++--- .../action/commit/SparkBulkInsertHelper.java | 29 +++++------ .../TestBulkInsertInternalPartitioner.java | 1 + .../org/apache/hudi/TestDataSourceUtils.java | 4 +- 23 files changed, 130 insertions(+), 77 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index fd1558a8232bb..ae87ade75b4f0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -18,24 +18,64 @@ package org.apache.hudi.table; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.io.WriteHandleFactory; + +import java.io.Serializable; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + /** * Repartition input records into at least expected number of output spark partitions. It should give below guarantees - * Output spark partition will have records from only one hoodie partition. - Average records per output spark * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. */ -public interface BulkInsertPartitioner { +public abstract class BulkInsertPartitioner implements Serializable { + + private WriteHandleFactory defaultWriteHandleFactory; + private List fileIdPfx; /** - * Repartitions the input records into at least expected number of output spark partitions. + * Repartitions the input records into at least expected number of output spark partitions, + * and generates fileIdPfx for each partition. * * @param records Input Hoodie records * @param outputSparkPartitions Expected number of output partitions * @return */ - I repartitionRecords(I records, int outputSparkPartitions); + public abstract I repartitionRecords(I records, int outputSparkPartitions); /** * @return {@code true} if the records within a partition are sorted; {@code false} otherwise. */ - boolean arePartitionRecordsSorted(); + public abstract boolean arePartitionRecordsSorted(); + + public List getFileIdPfx() { + return fileIdPfx; + } + + public void setDefaultWriteHandleFactory(WriteHandleFactory defaultWriteHandleFactory) { + this.defaultWriteHandleFactory = defaultWriteHandleFactory; + } + + /** + * Return write handle factory for the given partition. + * By default, return the pre-assigned write handle factory for all partitions + * @param partition data partition + * @return + */ + public WriteHandleFactory getWriteHandleFactory(int partition) { + return defaultWriteHandleFactory; + } + + /** + * Initialize a list of file id prefix randomly. + * In most cases, bulk_insert put all incoming records to randomly generated file groups (i.e., the current default implementation). + * @param parallelism the number of output file id + * @return lists of file groups, the Nth element corresponds to partition N + */ + protected void generateFileIdPfx(int parallelism) { + fileIdPfx = IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java index ad2145c3501bf..325439f99db55 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -42,8 +41,7 @@ public abstract HoodieWriteMetadata bulkInsert(I inputRecords, String instant public abstract O bulkInsert(I inputRecords, String instantTime, HoodieTable table, HoodieWriteConfig config, boolean performDedupe, - Option userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean addMetadataFields, - int parallelism, - WriteHandleFactory writeHandleFactory); + int parallelism); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 7d7609f0fa0a9..233c70ecf9eb6 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; @@ -121,16 +122,16 @@ public abstract List performClusteringWithRecordList( * * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. * @param schema Schema of the data including metadata fields. - * @return empty for now. + * @return partitioner for the java engine */ - protected Option>>> getPartitioner(Map strategyParams, Schema schema) { + protected BulkInsertPartitioner>> getPartitioner(Map strategyParams, Schema schema) { if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { - return Option.of(new JavaCustomColumnsSortPartitioner( + return new JavaCustomColumnsSortPartitioner( strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), HoodieAvroUtils.addMetadataFields(schema), - getWriteConfig().isConsistentLogicalTimestampEnabled())); + getWriteConfig().isConsistentLogicalTimestampEnabled()); } else { - return Option.empty(); + return JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java index d34673c2d9b9a..9059f38241438 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; @@ -65,7 +66,11 @@ public List performClusteringWithRecordList( .withEngineType(EngineType.JAVA) .withProps(getWriteConfig().getProps()).build(); newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); + + BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema); + partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(preserveHoodieMetadata)); + return (List) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); + false, partitioner, true, numOutputGroups); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java index eb3d4ef312e99..01c5dd0308be2 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -35,7 +35,7 @@ * @param HoodieRecordPayload type */ public class JavaCustomColumnsSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { private final String[] sortColumnNames; private final Schema schema; @@ -50,6 +50,7 @@ public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boo @Override public List> repartitionRecords( List> records, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); return records.stream().sorted((o1, o2) -> { Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled); Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java index fded0ffab51bd..2bde4e583304f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java @@ -33,11 +33,12 @@ * @param HoodieRecordPayload type */ public class JavaGlobalSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { @Override public List> repartitionRecords(List> records, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); // Now, sort the records and line them up nicely for loading. records.sort(new Comparator() { @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java index b40459d838444..99dd393b24b4d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java @@ -31,11 +31,12 @@ * @param HoodieRecordPayload type */ public class JavaNonSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { @Override public List> repartitionRecords(List> records, int outputPartitions) { + generateFileIdPfx(outputPartitions); return records; } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 39b2916732f2a..525fc7155c31b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -30,7 +30,6 @@ import org.apache.hudi.execution.JavaLazyInsertIterable; import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; import org.apache.hudi.io.CreateHandleFactory; -import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.FileIdPrefixProvider; import org.apache.hudi.table.HoodieTable; @@ -77,8 +76,13 @@ public HoodieWriteMetadata> bulkInsert(final List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism()); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -90,10 +94,9 @@ public List bulkInsert(List> inputRecords, HoodieTable>, List, List> table, HoodieWriteConfig config, boolean performDedupe, - Option userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean useWriterSchema, - int parallelism, - WriteHandleFactory writeHandleFactory) { + int parallelism) { // De-dupe/merge if needed List> dedupedRecords = inputRecords; @@ -103,12 +106,7 @@ public List bulkInsert(List> inputRecords, parallelism, table); } - final List> repartitionedRecords; - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); - // only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 - repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); + final List> repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass( config.getFileIdPrefixProviderClassName(), @@ -119,7 +117,7 @@ public List bulkInsert(List> inputRecords, new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), - new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll); + partitioner.getWriteHandleFactory(0)).forEachRemaining(writeStatuses::addAll); return writeStatuses; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 5a03cdf3bc9a1..e09457f0e5135 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -46,6 +46,7 @@ import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; @@ -137,7 +138,7 @@ public abstract HoodieData performClusteringWithRecordsRDD(final Ho * @param schema Schema of the data including metadata fields. * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. */ - protected Option>>> getPartitioner(Map strategyParams, Schema schema) { + protected BulkInsertPartitioner>> getPartitioner(Map strategyParams, Schema schema) { Option orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); @@ -159,7 +160,7 @@ protected Option>>> getPartitioner default: throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); } - }); + }).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode())); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 4a7ee7bceeacd..b69773b25bd54 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -29,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.SingleFileHandleCreateFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -72,7 +73,11 @@ public HoodieData performClusteringWithRecordsRDD(HoodieData) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); + false, partitioner, true, numOutputGroups); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index 7db63d4169fc1..0dd6e0bc76ea6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -62,7 +63,11 @@ public HoodieData performClusteringWithRecordsRDD(final HoodieData< .withBulkInsertParallelism(numOutputGroups) .withProps(getWriteConfig().getProps()).build(); newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); + + BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema); + partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(preserveHoodieMetadata)); + return (HoodieData) SparkBulkInsertHelper.newInstance() - .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); + .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, partitioner, true, numOutputGroups); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index 24cdd70603cbe..51febb8ef4340 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.SparkLazyInsertIterable; -import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.function.Function2; @@ -41,27 +41,24 @@ public class BulkInsertMapFunction private boolean areRecordsSorted; private HoodieWriteConfig config; private HoodieTable hoodieTable; - private List fileIDPrefixes; private boolean useWriterSchema; - private WriteHandleFactory writeHandleFactory; + private BulkInsertPartitioner partitioner; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, - List fileIDPrefixes, boolean useWriterSchema, - WriteHandleFactory writeHandleFactory) { + boolean useWriterSchema, BulkInsertPartitioner partitioner) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; - this.fileIDPrefixes = fileIDPrefixes; this.useWriterSchema = useWriterSchema; - this.writeHandleFactory = writeHandleFactory; + this.partitioner = partitioner; } @Override public Iterator> call(Integer partition, Iterator> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, - writeHandleFactory); + (String)partitioner.getFileIdPfx().get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, + partitioner.getWriteHandleFactory(partition)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java index a184c009a1b45..35a6e82940c0d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java @@ -32,11 +32,12 @@ * @param HoodieRecordPayload type */ public class GlobalSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); // Now, sort the records and line them up nicely for loading. return records.sortBy(record -> { // Let's use "partitionPath + key" as the sort key. Spark, will ensure diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java index 24bcc0aff0df3..1248e899ef6b2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java @@ -28,10 +28,11 @@ /** * A built-in partitioner that does global sorting for the input Rows across partitions after repartition for bulk insert operation, corresponding to the {@code BulkInsertSortMode.GLOBAL_SORT} mode. */ -public class GlobalSortPartitionerWithRows implements BulkInsertPartitioner> { +public class GlobalSortPartitionerWithRows extends BulkInsertPartitioner> { @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); // Now, sort the records and line them up nicely for loading. // Let's use "partitionPath + key" as the sort key. return rows.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD)) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java index 19c90ecb1a012..67e4428730ebc 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java @@ -31,11 +31,12 @@ * @param HoodieRecordPayload type */ public class NonSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); return records.coalesce(outputSparkPartitions); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java index e1c34a8f84062..aca5b73f286ed 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java @@ -28,10 +28,11 @@ * corresponding to the {@code BulkInsertSortMode.NONE} mode. * */ -public class NonSortPartitionerWithRows implements BulkInsertPartitioner> { +public class NonSortPartitionerWithRows extends BulkInsertPartitioner> { @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); return rows.coalesce(outputSparkPartitions); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java index b669c338f8668..e2a0e0ea5f5ae 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java @@ -27,10 +27,11 @@ /** * A built-in partitioner that does local sorting for each spark partitions after coalesce for bulk insert operation, corresponding to the {@code BulkInsertSortMode.PARTITION_SORT} mode. */ -public class PartitionSortPartitionerWithRows implements BulkInsertPartitioner> { +public class PartitionSortPartitionerWithRows extends BulkInsertPartitioner> { @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); return rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 2fe6fe969c482..1fbea6d488530 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -35,7 +35,7 @@ * @param HoodieRecordPayload type */ public class RDDCustomColumnsSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { private final String[] sortColumnNames; private final SerializableSchema serializableSchema; @@ -56,6 +56,7 @@ public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, bool @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); final String[] sortColumns = this.sortColumnNames; final SerializableSchema schema = this.serializableSchema; final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java index 9526ad5856469..b6408de1fd810 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java @@ -38,11 +38,12 @@ * @param HoodieRecordPayload type */ public class RDDPartitionSortPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { + generateFileIdPfx(outputSparkPartitions); return records.coalesce(outputSparkPartitions) .mapToPair(record -> new Tuple2<>( diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 219fb0b165972..103312fc0afcd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -46,12 +46,11 @@ * support z-curve optimization, hilbert will come soon. * @param HoodieRecordPayload type */ -public class RDDSpatialCurveSortPartitioner - implements BulkInsertPartitioner>> { +public class RDDSpatialCurveSortPartitioner extends BulkInsertPartitioner>> { - private final HoodieSparkEngineContext sparkEngineContext; + private final transient HoodieSparkEngineContext sparkEngineContext; private final String[] orderByColumns; - private final Schema schema; + private final SerializableSchema schema; private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; @@ -64,14 +63,14 @@ public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContex this.orderByColumns = orderByColumns; this.layoutOptStrategy = layoutOptStrategy; this.curveCompositionStrategyType = curveCompositionStrategyType; - this.schema = schema; + this.schema = new SerializableSchema(schema); } @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - SerializableSchema serializableSchema = new SerializableSchema(schema); + generateFileIdPfx(outputSparkPartitions); JavaRDD genericRecordsRDD = - records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); + records.map(f -> (GenericRecord) f.getData().getInsertValue(schema.get()).get()); Dataset sourceDataset = AvroConversionUtils.createDataFrame( @@ -82,7 +81,7 @@ public JavaRDD> repartitionRecords(JavaRDD> reco Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); - return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty()) + return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) .toJavaRDD() .map(record -> { String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 38e38101b0d02..82daa771feda7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -20,7 +20,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -31,7 +30,6 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction; import org.apache.hudi.io.CreateHandleFactory; -import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -39,8 +37,6 @@ import org.apache.spark.api.java.JavaRDD; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * A spark implementation of {@link BaseBulkInsertHelper}. @@ -76,9 +72,15 @@ public HoodieWriteMetadata> bulkInsert(final HoodieData< table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); + + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() + ? userDefinedBulkInsertPartitioner.get() + : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); + partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(false)); + // write new files HoodieData writeStatuses = - bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism()); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -90,10 +92,9 @@ public HoodieData bulkInsert(HoodieData> inputRecor HoodieTable>, HoodieData, HoodieData> table, HoodieWriteConfig config, boolean performDedupe, - Option userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean useWriterSchema, - int parallelism, - WriteHandleFactory writeHandleFactory) { + int parallelism) { // De-dupe/merge if needed HoodieData> dedupedRecords = inputRecords; @@ -103,20 +104,12 @@ public HoodieData bulkInsert(HoodieData> inputRecor parallelism, table); } - final HoodieData> repartitionedRecords; - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 - repartitionedRecords = HoodieJavaRDD.of((JavaRDD>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism)); - - // generate new file ID prefixes for each output partition - final List fileIDPrefixes = - IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); + final HoodieData> repartitionedRecords = HoodieJavaRDD.of((JavaRDD>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism)); JavaRDD writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords) .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true) + partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner), true) .flatMap(List::iterator); return HoodieJavaRDD.of(writeStatusRDD); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 4d2f5e0c5e229..33e6fb2126104 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -108,6 +108,7 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner JavaRDD> actualRecords = (JavaRDD>) partitioner.repartitionRecords(records, numPartitions); assertEquals(numPartitions, actualRecords.getNumPartitions()); + assertEquals(numPartitions, partitioner.getFileIdPfx().size()); List> collectedActualRecords = actualRecords.collect(); if (isGloballySorted) { // Verify global order diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index af5bbe7717959..7bc5b1e8c4429 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -281,7 +281,7 @@ private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName } public static class NoOpBulkInsertPartitioner - implements BulkInsertPartitioner>> { + extends BulkInsertPartitioner>> { public NoOpBulkInsertPartitioner(HoodieWriteConfig config) {} @@ -297,7 +297,7 @@ public boolean arePartitionRecordsSorted() { } public static class NoOpBulkInsertPartitionerRows - implements BulkInsertPartitioner> { + extends BulkInsertPartitioner> { public NoOpBulkInsertPartitionerRows(HoodieWriteConfig config) {} From a4f82cf5595ed530699182348ecc7abcbdd7f8d9 Mon Sep 17 00:00:00 2001 From: xiaoyuwei Date: Sat, 26 Feb 2022 21:40:52 +0800 Subject: [PATCH 2/3] [HUDI-3085] review: better and clear interface --- .../hudi/table/BulkInsertPartitioner.java | 47 +++++++------------ .../action/commit/BaseBulkInsertHelper.java | 4 +- .../JavaSortAndSizeExecutionStrategy.java | 6 +-- .../JavaCustomColumnsSortPartitioner.java | 3 +- .../bulkinsert/JavaGlobalSortPartitioner.java | 3 +- .../bulkinsert/JavaNonSortPartitioner.java | 3 +- .../action/commit/JavaBulkInsertHelper.java | 9 ++-- .../SparkSingleFileSortExecutionStrategy.java | 6 +-- .../SparkSortAndSizeExecutionStrategy.java | 7 +-- .../bulkinsert/BulkInsertMapFunction.java | 10 ++-- .../bulkinsert/GlobalSortPartitioner.java | 3 +- .../GlobalSortPartitionerWithRows.java | 3 +- .../bulkinsert/NonSortPartitioner.java | 3 +- .../NonSortPartitionerWithRows.java | 3 +- .../PartitionSortPartitionerWithRows.java | 3 +- .../RDDCustomColumnsSortPartitioner.java | 3 +- .../RDDPartitionSortPartitioner.java | 3 +- .../RDDSpatialCurveSortPartitioner.java | 4 +- .../action/commit/SparkBulkInsertHelper.java | 9 ++-- .../TestBulkInsertInternalPartitioner.java | 1 - .../org/apache/hudi/TestDataSourceUtils.java | 4 +- 21 files changed, 54 insertions(+), 83 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index ae87ade75b4f0..6e2cb31809a47 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -19,63 +19,50 @@ package org.apache.hudi.table; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.io.WriteHandleFactory; import java.io.Serializable; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * Repartition input records into at least expected number of output spark partitions. It should give below guarantees - * Output spark partition will have records from only one hoodie partition. - Average records per output spark * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. */ -public abstract class BulkInsertPartitioner implements Serializable { - - private WriteHandleFactory defaultWriteHandleFactory; - private List fileIdPfx; +public interface BulkInsertPartitioner extends Serializable { /** - * Repartitions the input records into at least expected number of output spark partitions, - * and generates fileIdPfx for each partition. + * Repartitions the input records into at least expected number of output spark partitions. * * @param records Input Hoodie records * @param outputSparkPartitions Expected number of output partitions * @return */ - public abstract I repartitionRecords(I records, int outputSparkPartitions); + I repartitionRecords(I records, int outputSparkPartitions); /** * @return {@code true} if the records within a partition are sorted; {@code false} otherwise. */ - public abstract boolean arePartitionRecordsSorted(); - - public List getFileIdPfx() { - return fileIdPfx; - } - - public void setDefaultWriteHandleFactory(WriteHandleFactory defaultWriteHandleFactory) { - this.defaultWriteHandleFactory = defaultWriteHandleFactory; - } + boolean arePartitionRecordsSorted(); /** - * Return write handle factory for the given partition. - * By default, return the pre-assigned write handle factory for all partitions - * @param partition data partition + * Return file group id prefix for the given data partition. + * By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group + * @param partitionId data partition * @return */ - public WriteHandleFactory getWriteHandleFactory(int partition) { - return defaultWriteHandleFactory; + default String getFileIdPfx(int partitionId) { + return FSUtils.createNewFileIdPfx(); } /** - * Initialize a list of file id prefix randomly. - * In most cases, bulk_insert put all incoming records to randomly generated file groups (i.e., the current default implementation). - * @param parallelism the number of output file id - * @return lists of file groups, the Nth element corresponds to partition N + * Return write handle factory for the given partition. + * By default, return CreateHandleFactory which will always write to a new file group + * @param partitionId data partition + * @return */ - protected void generateFileIdPfx(int parallelism) { - fileIdPfx = IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); + default Option getWriteHandleFactory(int partitionId) { + return Option.empty(); } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java index 325439f99db55..5355194ff75bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -43,5 +44,6 @@ public abstract O bulkInsert(I inputRecords, String instantTime, boolean performDedupe, BulkInsertPartitioner partitioner, boolean addMetadataFields, - int parallelism); + int parallelism, + WriteHandleFactory writeHandleFactory); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java index 9059f38241438..d8099b60eeda3 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -28,7 +28,6 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; -import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; @@ -67,10 +66,7 @@ public List performClusteringWithRecordList( .withProps(getWriteConfig().getProps()).build(); newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); - BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema); - partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(preserveHoodieMetadata)); - return (List) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, partitioner, true, numOutputGroups); + false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java index 01c5dd0308be2..eb3d4ef312e99 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java @@ -35,7 +35,7 @@ * @param HoodieRecordPayload type */ public class JavaCustomColumnsSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { private final String[] sortColumnNames; private final Schema schema; @@ -50,7 +50,6 @@ public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, boo @Override public List> repartitionRecords( List> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); return records.stream().sorted((o1, o2) -> { Object values1 = HoodieAvroUtils.getRecordColumnValues(o1, sortColumnNames, schema, consistentLogicalTimestampEnabled); Object values2 = HoodieAvroUtils.getRecordColumnValues(o2, sortColumnNames, schema, consistentLogicalTimestampEnabled); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java index 2bde4e583304f..fded0ffab51bd 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaGlobalSortPartitioner.java @@ -33,12 +33,11 @@ * @param HoodieRecordPayload type */ public class JavaGlobalSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { @Override public List> repartitionRecords(List> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); // Now, sort the records and line them up nicely for loading. records.sort(new Comparator() { @Override diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java index 99dd393b24b4d..b40459d838444 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaNonSortPartitioner.java @@ -31,12 +31,11 @@ * @param HoodieRecordPayload type */ public class JavaNonSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { @Override public List> repartitionRecords(List> records, int outputPartitions) { - generateFileIdPfx(outputPartitions); return records; } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 525fc7155c31b..9833eb60fb64d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -30,6 +30,7 @@ import org.apache.hudi.execution.JavaLazyInsertIterable; import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.FileIdPrefixProvider; import org.apache.hudi.table.HoodieTable; @@ -79,10 +80,9 @@ public HoodieWriteMetadata> bulkInsert(final List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism()); + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -96,7 +96,8 @@ public List bulkInsert(List> inputRecords, boolean performDedupe, BulkInsertPartitioner partitioner, boolean useWriterSchema, - int parallelism) { + int parallelism, + WriteHandleFactory writeHandleFactory) { // De-dupe/merge if needed List> dedupedRecords = inputRecords; @@ -117,7 +118,7 @@ public List bulkInsert(List> inputRecords, new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), - partitioner.getWriteHandleFactory(0)).forEachRemaining(writeStatuses::addAll); + (WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll); return writeStatuses; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index b69773b25bd54..b61017c34ce41 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -29,7 +29,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.io.SingleFileHandleCreateFactory; -import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -73,11 +72,8 @@ public HoodieData performClusteringWithRecordsRDD(HoodieData) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, partitioner, true, numOutputGroups); + false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index 0dd6e0bc76ea6..7db63d4169fc1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -27,7 +27,6 @@ import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.CreateHandleFactory; -import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SparkBulkInsertHelper; @@ -63,11 +62,7 @@ public HoodieData performClusteringWithRecordsRDD(final HoodieData< .withBulkInsertParallelism(numOutputGroups) .withProps(getWriteConfig().getProps()).build(); newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); - - BulkInsertPartitioner partitioner = getPartitioner(strategyParams, schema); - partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(preserveHoodieMetadata)); - return (HoodieData) SparkBulkInsertHelper.newInstance() - .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, partitioner, true, numOutputGroups); + .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index 51febb8ef4340..66c3bdddcb1ef 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.SparkLazyInsertIterable; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; @@ -43,22 +44,25 @@ public class BulkInsertMapFunction private HoodieTable hoodieTable; private boolean useWriterSchema; private BulkInsertPartitioner partitioner; + private WriteHandleFactory writeHandleFactory; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, - boolean useWriterSchema, BulkInsertPartitioner partitioner) { + boolean useWriterSchema, BulkInsertPartitioner partitioner, + WriteHandleFactory writeHandleFactory) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; this.useWriterSchema = useWriterSchema; + this.writeHandleFactory = writeHandleFactory; this.partitioner = partitioner; } @Override public Iterator> call(Integer partition, Iterator> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, - (String)partitioner.getFileIdPfx().get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, - partitioner.getWriteHandleFactory(partition)); + partitioner.getFileIdPfx(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, + (WriteHandleFactory) partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java index 35a6e82940c0d..a184c009a1b45 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java @@ -32,12 +32,11 @@ * @param HoodieRecordPayload type */ public class GlobalSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); // Now, sort the records and line them up nicely for loading. return records.sortBy(record -> { // Let's use "partitionPath + key" as the sort key. Spark, will ensure diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java index 1248e899ef6b2..24bcc0aff0df3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java @@ -28,11 +28,10 @@ /** * A built-in partitioner that does global sorting for the input Rows across partitions after repartition for bulk insert operation, corresponding to the {@code BulkInsertSortMode.GLOBAL_SORT} mode. */ -public class GlobalSortPartitionerWithRows extends BulkInsertPartitioner> { +public class GlobalSortPartitionerWithRows implements BulkInsertPartitioner> { @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); // Now, sort the records and line them up nicely for loading. // Let's use "partitionPath + key" as the sort key. return rows.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD)) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java index 67e4428730ebc..19c90ecb1a012 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java @@ -31,12 +31,11 @@ * @param HoodieRecordPayload type */ public class NonSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); return records.coalesce(outputSparkPartitions); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java index aca5b73f286ed..e1c34a8f84062 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java @@ -28,11 +28,10 @@ * corresponding to the {@code BulkInsertSortMode.NONE} mode. * */ -public class NonSortPartitionerWithRows extends BulkInsertPartitioner> { +public class NonSortPartitionerWithRows implements BulkInsertPartitioner> { @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); return rows.coalesce(outputSparkPartitions); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java index e2a0e0ea5f5ae..b669c338f8668 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java @@ -27,11 +27,10 @@ /** * A built-in partitioner that does local sorting for each spark partitions after coalesce for bulk insert operation, corresponding to the {@code BulkInsertSortMode.PARTITION_SORT} mode. */ -public class PartitionSortPartitionerWithRows extends BulkInsertPartitioner> { +public class PartitionSortPartitionerWithRows implements BulkInsertPartitioner> { @Override public Dataset repartitionRecords(Dataset rows, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); return rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.RECORD_KEY_METADATA_FIELD); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 1fbea6d488530..2fe6fe969c482 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -35,7 +35,7 @@ * @param HoodieRecordPayload type */ public class RDDCustomColumnsSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { private final String[] sortColumnNames; private final SerializableSchema serializableSchema; @@ -56,7 +56,6 @@ public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, bool @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); final String[] sortColumns = this.sortColumnNames; final SerializableSchema schema = this.serializableSchema; final boolean consistentLogicalTimestampEnabled = this.consistentLogicalTimestampEnabled; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java index b6408de1fd810..9526ad5856469 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java @@ -38,12 +38,11 @@ * @param HoodieRecordPayload type */ public class RDDPartitionSortPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); return records.coalesce(outputSparkPartitions) .mapToPair(record -> new Tuple2<>( diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 103312fc0afcd..50a0a534f881b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -46,7 +46,8 @@ * support z-curve optimization, hilbert will come soon. * @param HoodieRecordPayload type */ -public class RDDSpatialCurveSortPartitioner extends BulkInsertPartitioner>> { +public class RDDSpatialCurveSortPartitioner + implements BulkInsertPartitioner>> { private final transient HoodieSparkEngineContext sparkEngineContext; private final String[] orderByColumns; @@ -68,7 +69,6 @@ public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContex @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - generateFileIdPfx(outputSparkPartitions); JavaRDD genericRecordsRDD = records.map(f -> (GenericRecord) f.getData().getInsertValue(schema.get()).get()); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 82daa771feda7..55fb879eb4d96 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -30,6 +30,7 @@ import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction; import org.apache.hudi.io.CreateHandleFactory; +import org.apache.hudi.io.WriteHandleFactory; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -76,11 +77,10 @@ public HoodieWriteMetadata> bulkInsert(final HoodieData< BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() ? userDefinedBulkInsertPartitioner.get() : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); - partitioner.setDefaultWriteHandleFactory(new CreateHandleFactory(false)); // write new files HoodieData writeStatuses = - bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism()); + bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -94,7 +94,8 @@ public HoodieData bulkInsert(HoodieData> inputRecor boolean performDedupe, BulkInsertPartitioner partitioner, boolean useWriterSchema, - int parallelism) { + int parallelism, + WriteHandleFactory writeHandleFactory) { // De-dupe/merge if needed HoodieData> dedupedRecords = inputRecords; @@ -109,7 +110,7 @@ public HoodieData bulkInsert(HoodieData> inputRecor JavaRDD writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords) .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner), true) + partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner, writeHandleFactory), true) .flatMap(List::iterator); return HoodieJavaRDD.of(writeStatusRDD); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index 33e6fb2126104..4d2f5e0c5e229 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -108,7 +108,6 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner JavaRDD> actualRecords = (JavaRDD>) partitioner.repartitionRecords(records, numPartitions); assertEquals(numPartitions, actualRecords.getNumPartitions()); - assertEquals(numPartitions, partitioner.getFileIdPfx().size()); List> collectedActualRecords = actualRecords.collect(); if (isGloballySorted) { // Verify global order diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 7bc5b1e8c4429..af5bbe7717959 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -281,7 +281,7 @@ private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName } public static class NoOpBulkInsertPartitioner - extends BulkInsertPartitioner>> { + implements BulkInsertPartitioner>> { public NoOpBulkInsertPartitioner(HoodieWriteConfig config) {} @@ -297,7 +297,7 @@ public boolean arePartitionRecordsSorted() { } public static class NoOpBulkInsertPartitionerRows - extends BulkInsertPartitioner> { + implements BulkInsertPartitioner> { public NoOpBulkInsertPartitionerRows(HoodieWriteConfig config) {} From de00fb3133f06340c3bbac590dee2a19ec531748 Mon Sep 17 00:00:00 2001 From: xiaoyuwei Date: Sun, 24 Apr 2022 10:18:06 +0800 Subject: [PATCH 3/3] [HUDI-3085] review fix --- .../java/org/apache/hudi/table/BulkInsertPartitioner.java | 1 - .../run/strategy/JavaSortAndSizeExecutionStrategy.java | 1 - .../hudi/table/action/commit/JavaBulkInsertHelper.java | 8 ++++---- .../hudi/table/action/commit/SparkBulkInsertHelper.java | 8 +++----- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 6e2cb31809a47..63b502531a896 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -57,7 +57,6 @@ default String getFileIdPfx(int partitionId) { /** * Return write handle factory for the given partition. - * By default, return CreateHandleFactory which will always write to a new file group * @param partitionId data partition * @return */ diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java index d8099b60eeda3..d34673c2d9b9a 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -65,7 +65,6 @@ public List performClusteringWithRecordList( .withEngineType(EngineType.JAVA) .withProps(getWriteConfig().getProps()).build(); newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); - return (List) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 9833eb60fb64d..e126372aa9068 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -77,12 +77,11 @@ public HoodieWriteMetadata> bulkInsert(final List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, + config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -118,6 +117,7 @@ public List bulkInsert(List> inputRecords, new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), + // Always get the first WriteHandleFactory, as there is only a single data partition for hudi java engine. (WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll); return writeStatuses; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 55fb879eb4d96..1652c35eb63e6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -74,13 +74,11 @@ public HoodieWriteMetadata> bulkInsert(final HoodieData< executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode())); // write new files - HoodieData writeStatuses = - bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + HoodieData writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, + config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result;