diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index fd1558a8232bb..63b502531a896 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -18,12 +18,18 @@ package org.apache.hudi.table; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.WriteHandleFactory; + +import java.io.Serializable; + /** * Repartition input records into at least expected number of output spark partitions. It should give below guarantees - * Output spark partition will have records from only one hoodie partition. - Average records per output spark * partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews. */ -public interface BulkInsertPartitioner { +public interface BulkInsertPartitioner extends Serializable { /** * Repartitions the input records into at least expected number of output spark partitions. @@ -38,4 +44,24 @@ public interface BulkInsertPartitioner { * @return {@code true} if the records within a partition are sorted; {@code false} otherwise. */ boolean arePartitionRecordsSorted(); + + /** + * Return file group id prefix for the given data partition. + * By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group + * @param partitionId data partition + * @return + */ + default String getFileIdPfx(int partitionId) { + return FSUtils.createNewFileIdPfx(); + } + + /** + * Return write handle factory for the given partition. + * @param partitionId data partition + * @return + */ + default Option getWriteHandleFactory(int partitionId) { + return Option.empty(); + } + } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java index ad2145c3501bf..5355194ff75bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java @@ -42,7 +42,7 @@ public abstract HoodieWriteMetadata bulkInsert(I inputRecords, String instant public abstract O bulkInsert(I inputRecords, String instantTime, HoodieTable table, HoodieWriteConfig config, boolean performDedupe, - Option userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean addMetadataFields, int parallelism, WriteHandleFactory writeHandleFactory); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 7d7609f0fa0a9..233c70ecf9eb6 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -41,6 +41,7 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; @@ -121,16 +122,16 @@ public abstract List performClusteringWithRecordList( * * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. * @param schema Schema of the data including metadata fields. - * @return empty for now. + * @return partitioner for the java engine */ - protected Option>>> getPartitioner(Map strategyParams, Schema schema) { + protected BulkInsertPartitioner>> getPartitioner(Map strategyParams, Schema schema) { if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { - return Option.of(new JavaCustomColumnsSortPartitioner( + return new JavaCustomColumnsSortPartitioner( strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), HoodieAvroUtils.addMetadataFields(schema), - getWriteConfig().isConsistentLogicalTimestampEnabled())); + getWriteConfig().isConsistentLogicalTimestampEnabled()); } else { - return Option.empty(); + return JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()); } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java index 39b2916732f2a..e126372aa9068 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java @@ -77,8 +77,11 @@ public HoodieWriteMetadata> bulkInsert(final List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + List writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, + config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -90,7 +93,7 @@ public List bulkInsert(List> inputRecords, HoodieTable>, List, List> table, HoodieWriteConfig config, boolean performDedupe, - Option userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean useWriterSchema, int parallelism, WriteHandleFactory writeHandleFactory) { @@ -103,12 +106,7 @@ public List bulkInsert(List> inputRecords, parallelism, table); } - final List> repartitionedRecords; - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); - // only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 - repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); + final List> repartitionedRecords = (List>) partitioner.repartitionRecords(dedupedRecords, parallelism); FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass( config.getFileIdPrefixProviderClassName(), @@ -119,7 +117,8 @@ public List bulkInsert(List> inputRecords, new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true, config, instantTime, table, fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(), - new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll); + // Always get the first WriteHandleFactory, as there is only a single data partition for hudi java engine. + (WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll); return writeStatuses; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 5a03cdf3bc9a1..e09457f0e5135 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -46,6 +46,7 @@ import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; @@ -137,7 +138,7 @@ public abstract HoodieData performClusteringWithRecordsRDD(final Ho * @param schema Schema of the data including metadata fields. * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. */ - protected Option>>> getPartitioner(Map strategyParams, Schema schema) { + protected BulkInsertPartitioner>> getPartitioner(Map strategyParams, Schema schema) { Option orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); @@ -159,7 +160,7 @@ protected Option>>> getPartitioner default: throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); } - }); + }).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode())); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index 4a7ee7bceeacd..b61017c34ce41 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -72,6 +72,7 @@ public HoodieData performClusteringWithRecordsRDD(HoodieData) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java index 24cdd70603cbe..66c3bdddcb1ef 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunction.java @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.SparkLazyInsertIterable; import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.spark.api.java.function.Function2; @@ -41,27 +42,27 @@ public class BulkInsertMapFunction private boolean areRecordsSorted; private HoodieWriteConfig config; private HoodieTable hoodieTable; - private List fileIDPrefixes; private boolean useWriterSchema; + private BulkInsertPartitioner partitioner; private WriteHandleFactory writeHandleFactory; public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted, HoodieWriteConfig config, HoodieTable hoodieTable, - List fileIDPrefixes, boolean useWriterSchema, + boolean useWriterSchema, BulkInsertPartitioner partitioner, WriteHandleFactory writeHandleFactory) { this.instantTime = instantTime; this.areRecordsSorted = areRecordsSorted; this.config = config; this.hoodieTable = hoodieTable; - this.fileIDPrefixes = fileIDPrefixes; this.useWriterSchema = useWriterSchema; this.writeHandleFactory = writeHandleFactory; + this.partitioner = partitioner; } @Override public Iterator> call(Integer partition, Iterator> recordItr) { return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable, - fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, - writeHandleFactory); + partitioner.getFileIdPfx(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema, + (WriteHandleFactory) partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 219fb0b165972..50a0a534f881b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -49,9 +49,9 @@ public class RDDSpatialCurveSortPartitioner implements BulkInsertPartitioner>> { - private final HoodieSparkEngineContext sparkEngineContext; + private final transient HoodieSparkEngineContext sparkEngineContext; private final String[] orderByColumns; - private final Schema schema; + private final SerializableSchema schema; private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; @@ -64,14 +64,13 @@ public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContex this.orderByColumns = orderByColumns; this.layoutOptStrategy = layoutOptStrategy; this.curveCompositionStrategyType = curveCompositionStrategyType; - this.schema = schema; + this.schema = new SerializableSchema(schema); } @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - SerializableSchema serializableSchema = new SerializableSchema(schema); JavaRDD genericRecordsRDD = - records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); + records.map(f -> (GenericRecord) f.getData().getInsertValue(schema.get()).get()); Dataset sourceDataset = AvroConversionUtils.createDataFrame( @@ -82,7 +81,7 @@ public JavaRDD> repartitionRecords(JavaRDD> reco Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); - return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty()) + return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty()) .toJavaRDD() .map(record -> { String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 38e38101b0d02..1652c35eb63e6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -20,7 +20,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -39,8 +38,6 @@ import org.apache.spark.api.java.JavaRDD; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * A spark implementation of {@link BaseBulkInsertHelper}. @@ -76,9 +73,12 @@ public HoodieWriteMetadata> bulkInsert(final HoodieData< table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED, executor.getCommitActionType(), instantTime), Option.empty(), config.shouldAllowMultiWriteOnSameInstant()); + + BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode())); + // write new files - HoodieData writeStatuses = - bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); + HoodieData writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false, + config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false)); //update index ((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result); return result; @@ -90,7 +90,7 @@ public HoodieData bulkInsert(HoodieData> inputRecor HoodieTable>, HoodieData, HoodieData> table, HoodieWriteConfig config, boolean performDedupe, - Option userDefinedBulkInsertPartitioner, + BulkInsertPartitioner partitioner, boolean useWriterSchema, int parallelism, WriteHandleFactory writeHandleFactory) { @@ -103,20 +103,12 @@ public HoodieData bulkInsert(HoodieData> inputRecor parallelism, table); } - final HoodieData> repartitionedRecords; - BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent() - ? userDefinedBulkInsertPartitioner.get() - : BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()); // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463 - repartitionedRecords = HoodieJavaRDD.of((JavaRDD>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism)); - - // generate new file ID prefixes for each output partition - final List fileIDPrefixes = - IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); + final HoodieData> repartitionedRecords = HoodieJavaRDD.of((JavaRDD>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism)); JavaRDD writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords) .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime, - partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true) + partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner, writeHandleFactory), true) .flatMap(List::iterator); return HoodieJavaRDD.of(writeStatusRDD);