Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

package org.apache.hudi.table;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.WriteHandleFactory;

import java.io.Serializable;

/**
* Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. - Average records per output spark
* partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
*/
public interface BulkInsertPartitioner<I> {
public interface BulkInsertPartitioner<I> extends Serializable {

/**
* Repartitions the input records into at least expected number of output spark partitions.
Expand All @@ -38,4 +44,24 @@ public interface BulkInsertPartitioner<I> {
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
*/
boolean arePartitionRecordsSorted();

/**
* Return file group id prefix for the given data partition.
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
* @param partitionId data partition
* @return
*/
default String getFileIdPfx(int partitionId) {
return FSUtils.createNewFileIdPfx();
}

/**
* Return write handle factory for the given partition.
* @param partitionId data partition
* @return
*/
default Option<WriteHandleFactory> getWriteHandleFactory(int partitionId) {
return Option.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instant
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
BulkInsertPartitioner partitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.execution.bulkinsert.JavaBulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.JavaCustomColumnsSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
Expand Down Expand Up @@ -121,16 +122,16 @@ public abstract List<WriteStatus> performClusteringWithRecordList(
*
* @param strategyParams Strategy parameters containing columns to sort the data by when clustering.
* @param schema Schema of the data including metadata fields.
* @return empty for now.
* @return partitioner for the java engine
*/
protected Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
protected BulkInsertPartitioner<List<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new JavaCustomColumnsSortPartitioner(
return new JavaCustomColumnsSortPartitioner(
strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled()));
getWriteConfig().isConsistentLogicalTimestampEnabled());
} else {
return Option.empty();
return JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord
config.shouldAllowMultiWriteOnSameInstant());
}

BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));

// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
Expand All @@ -90,7 +93,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
BulkInsertPartitioner partitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
Expand All @@ -103,12 +106,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
parallelism, table);
}

final List<HoodieRecord<T>> repartitionedRecords;
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
// only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
final List<HoodieRecord<T>> repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);

FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
config.getFileIdPrefixProviderClassName(),
Expand All @@ -119,7 +117,8 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
new JavaLazyInsertIterable<>(repartitionedRecords.iterator(), true,
config, instantTime, table,
fileIdPrefixProvider.createFilePrefix(""), table.getTaskContextSupplier(),
new CreateHandleFactory<>()).forEachRemaining(writeStatuses::addAll);
// Always get the first WriteHandleFactory, as there is only a single data partition for hudi java engine.
(WriteHandleFactory) partitioner.getWriteHandleFactory(0).orElse(writeHandleFactory)).forEachRemaining(writeStatuses::addAll);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here what's the meaning of passing 0 as partitioneId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 means getting write handle factory for partition 0. The code is consistent with previous behavior, as java engine always has only one data partition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add some comments here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.


return writeStatuses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
Expand Down Expand Up @@ -137,7 +138,7 @@ public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(final Ho
* @param schema Schema of the data including metadata fields.
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
*/
protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
Expand All @@ -159,7 +160,7 @@ protected Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> getPartitioner
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
});
}).orElse(BulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
.withProps(getWriteConfig().getProps()).build();
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));

return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.SparkLazyInsertIterable;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;

import org.apache.spark.api.java.function.Function2;
Expand All @@ -41,27 +42,27 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload>
private boolean areRecordsSorted;
private HoodieWriteConfig config;
private HoodieTable hoodieTable;
private List<String> fileIDPrefixes;
private boolean useWriterSchema;
private BulkInsertPartitioner partitioner;
private WriteHandleFactory writeHandleFactory;

public BulkInsertMapFunction(String instantTime, boolean areRecordsSorted,
HoodieWriteConfig config, HoodieTable hoodieTable,
List<String> fileIDPrefixes, boolean useWriterSchema,
boolean useWriterSchema, BulkInsertPartitioner partitioner,
WriteHandleFactory writeHandleFactory) {
this.instantTime = instantTime;
this.areRecordsSorted = areRecordsSorted;
this.config = config;
this.hoodieTable = hoodieTable;
this.fileIDPrefixes = fileIDPrefixes;
this.useWriterSchema = useWriterSchema;
this.writeHandleFactory = writeHandleFactory;
this.partitioner = partitioner;
}

@Override
public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> recordItr) {
return new SparkLazyInsertIterable<>(recordItr, areRecordsSorted, config, instantTime, hoodieTable,
fileIDPrefixes.get(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
writeHandleFactory);
partitioner.getFileIdPfx(partition), hoodieTable.getTaskContextSupplier(), useWriterSchema,
(WriteHandleFactory) partitioner.getWriteHandleFactory(partition).orElse(this.writeHandleFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {

private final HoodieSparkEngineContext sparkEngineContext;
private final transient HoodieSparkEngineContext sparkEngineContext;
private final String[] orderByColumns;
private final Schema schema;
private final SerializableSchema schema;
private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy;
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType;

Expand All @@ -64,14 +64,13 @@ public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContex
this.orderByColumns = orderByColumns;
this.layoutOptStrategy = layoutOptStrategy;
this.curveCompositionStrategyType = curveCompositionStrategyType;
this.schema = schema;
this.schema = new SerializableSchema(schema);
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
SerializableSchema serializableSchema = new SerializableSchema(schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice improvement.

JavaRDD<GenericRecord> genericRecordsRDD =
records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get());
records.map(f -> (GenericRecord) f.getData().getInsertValue(schema.get()).get());

Dataset<Row> sourceDataset =
AvroConversionUtils.createDataFrame(
Expand All @@ -82,7 +81,7 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco

Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);

return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty())
return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty())
.toJavaRDD()
.map(record -> {
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand All @@ -39,8 +38,6 @@
import org.apache.spark.api.java.JavaRDD;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* A spark implementation of {@link BaseBulkInsertHelper}.
Expand Down Expand Up @@ -76,9 +73,12 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());

BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));

// write new files
HoodieData<WriteStatus> writeStatuses =
bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
HoodieData<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner, false,
config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
Expand All @@ -90,7 +90,7 @@ public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecor
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
BulkInsertPartitioner partitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
Expand All @@ -103,20 +103,12 @@ public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecor
parallelism, table);
}

final HoodieData<HoodieRecord<T>> repartitionedRecords;
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
// only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism));

// generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
final HoodieData<HoodieRecord<T>> repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism));

JavaRDD<WriteStatus> writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords)
.mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
partitioner.arePartitionRecordsSorted(), config, table, useWriterSchema, partitioner, writeHandleFactory), true)
.flatMap(List::iterator);

return HoodieJavaRDD.of(writeStatusRDD);
Expand Down