Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,17 @@ private FlinkOptions() {
.withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n"
+ "waits for the instant commit success, only for internal use");

public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions
.key("write.bulk_insert.shuffle_by_partition")
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SHUFFLE_INPUT = ConfigOptions
.key("write.bulk_insert.shuffle_input")
.booleanType()
.defaultValue(true)
.withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true");
.withDescription("Whether to shuffle the inputs by specific fields for bulk insert tasks, default true");

public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions
.key("write.bulk_insert.sort_by_partition")
public static final ConfigOption<Boolean> WRITE_BULK_INSERT_SORT_INPUT = ConfigOptions
.key("write.bulk_insert.sort_input")
.booleanType()
.defaultValue(true)
.withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true");
.withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true");

public static final ConfigOption<Integer> WRITE_SORT_MEMORY = ConfigOptions
.key("write.sort.memory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
package org.apache.hudi.sink.bucket;

import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.bucket.BucketIdentifier;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.table.HoodieTable;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,36 +43,67 @@
*/
public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class);
public static final String FILE_GROUP_META_FIELD = "_fg";

private final int bucketNum;
private final String indexKeyFields;
private final int recordArity;

private String lastFileId; // for efficient code path

public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable<?, ?, ?, ?> hoodieTable, HoodieWriteConfig writeConfig,
String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) {
super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType);
this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
this.recordArity = rowType.getFieldCount();
}

public void write(RowData record) throws IOException {
public void write(RowData tuple) throws IOException {
try {
RowData record = tuple.getRow(1, this.recordArity);
String recordKey = keyGen.getRecordKey(record);
String partitionPath = keyGen.getPartitionPath(record);
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum);
String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record);
String fileId = tuple.getString(0).toString();
if ((lastFileId == null) || !lastFileId.equals(fileId)) {
LOG.info("Creating new file for partition path " + partitionPath);
handle = getRowCreateHandle(partitionPath, fileId);
lastFileId = fileId;
}
handle.write(recordKey, partitionPath, record);
} catch (Throwable throwable) {
LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable);
throw throwable;
}
}

private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) {
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) throws IOException {
if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId
if (this.isInputSorted) {
// if records are sorted, we can close all existing handles
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId,
instantTime, taskPartitionId, taskId, taskEpochId, rowType);
handles.put(fileId, rowCreateHandle);
}
return handles.get(fileId);
}

public static SortOperatorGen getFileIdSorterGen(RowType rowType) {
return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
}

private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
String recordKey = keyGen.getRecordKey(record);
final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum);
return BucketIdentifier.newBucketFileIdPrefix(bucketId);
}

public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum);
return GenericRowData.of(StringData.fromString(fileId), record);
}

public static RowType rowTypeWithFileId(RowType rowType) {
LogicalType[] types = new LogicalType[] {DataTypes.STRING().getLogicalType(), rowType};
String[] names = new String[] {FILE_GROUP_META_FIELD, "record"};
return RowType.of(types, names);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public class BulkInsertWriterHelper {
protected final HoodieTable hoodieTable;
protected final HoodieWriteConfig writeConfig;
protected final RowType rowType;
private final Boolean arePartitionRecordsSorted;
protected final Boolean isInputSorted;
private final List<HoodieInternalWriteStatus> writeStatusList = new ArrayList<>();
private HoodieRowDataCreateHandle handle;
protected HoodieRowDataCreateHandle handle;
private String lastKnownPartitionPath = null;
private final String fileIdPrefix;
private int numFilesWritten = 0;
Expand All @@ -75,7 +75,7 @@ public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, Hoodi
this.taskId = taskId;
this.taskEpochId = taskEpochId;
this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields
this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION);
this.isInputSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
this.fileIdPrefix = UUID.randomUUID().toString();
this.keyGen = RowDataKeyGen.instance(conf, rowType);
}
Expand Down Expand Up @@ -112,7 +112,7 @@ public List<HoodieInternalWriteStatus> getHoodieWriteStatuses() throws IOExcepti
private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException {
if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path
// if records are sorted, we can close all existing handles
if (arePartitionRecordsSorted) {
if (isInputSorted) {
close();
}
HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

/**
Expand Down Expand Up @@ -92,7 +94,18 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
typeInfo);
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator());
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return dataStream
.transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
Expand All @@ -103,7 +116,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf);
if (partitionFields.length > 0) {
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) {

// shuffle by partition keys
// use #partitionCustom instead of #keyBy to avoid duplicate sort operations,
Expand All @@ -112,7 +125,7 @@ public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowT
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath);
}
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
// sort by partition keys
dataStream = dataStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ void testBulkInsert(String indexType, boolean hiveStylePartitioning) {
String hoodieTableDDL = sql("hoodie_sink")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.OPERATION, "bulk_insert")
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true)
.option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true)
.option(FlinkOptions.INDEX_TYPE, indexType)
.option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
.end();
Expand Down