diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index cd5c2a70e1d48..b1e00875de9f2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -459,17 +459,17 @@ private FlinkOptions() { .withDescription("Timeout limit for a writer task after it finishes a checkpoint and\n" + "waits for the instant commit success, only for internal use"); - public static final ConfigOption WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION = ConfigOptions - .key("write.bulk_insert.shuffle_by_partition") + public static final ConfigOption WRITE_BULK_INSERT_SHUFFLE_INPUT = ConfigOptions + .key("write.bulk_insert.shuffle_input") .booleanType() .defaultValue(true) - .withDescription("Whether to shuffle the inputs by partition path for bulk insert tasks, default true"); + .withDescription("Whether to shuffle the inputs by specific fields for bulk insert tasks, default true"); - public static final ConfigOption WRITE_BULK_INSERT_SORT_BY_PARTITION = ConfigOptions - .key("write.bulk_insert.sort_by_partition") + public static final ConfigOption WRITE_BULK_INSERT_SORT_INPUT = ConfigOptions + .key("write.bulk_insert.sort_input") .booleanType() .defaultValue(true) - .withDescription("Whether to sort the inputs by partition path for bulk insert tasks, default true"); + .withDescription("Whether to sort the inputs by specific fields for bulk insert tasks, default true"); public static final ConfigOption WRITE_SORT_MEMORY = ConfigOptions .key("write.sort.memory") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java index c52b6f0e98243..ae646d9e0aac3 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java @@ -19,14 +19,19 @@ package org.apache.hudi.sink.bucket; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.bucket.BucketIdentifier; import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle; import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.sink.bulk.RowDataKeyGen; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; import org.apache.hudi.table.HoodieTable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,36 +43,67 @@ */ public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper { private static final Logger LOG = LoggerFactory.getLogger(BucketBulkInsertWriterHelper.class); + public static final String FILE_GROUP_META_FIELD = "_fg"; - private final int bucketNum; - private final String indexKeyFields; + private final int recordArity; + + private String lastFileId; // for efficient code path public BucketBulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, RowType rowType) { super(conf, hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, rowType); - this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); - this.indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + this.recordArity = rowType.getFieldCount(); } - public void write(RowData record) throws IOException { + public void write(RowData tuple) throws IOException { try { + RowData record = tuple.getRow(1, this.recordArity); String recordKey = keyGen.getRecordKey(record); String partitionPath = keyGen.getPartitionPath(record); - final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeyFields, this.bucketNum); - String fileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); - getRowCreateHandle(partitionPath, fileId).write(recordKey, partitionPath, record); + String fileId = tuple.getString(0).toString(); + if ((lastFileId == null) || !lastFileId.equals(fileId)) { + LOG.info("Creating new file for partition path " + partitionPath); + handle = getRowCreateHandle(partitionPath, fileId); + lastFileId = fileId; + } + handle.write(recordKey, partitionPath, record); } catch (Throwable throwable) { LOG.error("Global error thrown while trying to write records in HoodieRowDataCreateHandle", throwable); throw throwable; } } - private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) { + private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath, String fileId) throws IOException { if (!handles.containsKey(fileId)) { // if there is no handle corresponding to the fileId + if (this.isInputSorted) { + // if records are sorted, we can close all existing handles + close(); + } HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, fileId, instantTime, taskPartitionId, taskId, taskEpochId, rowType); handles.put(fileId, rowCreateHandle); } return handles.get(fileId); } + + public static SortOperatorGen getFileIdSorterGen(RowType rowType) { + return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD}); + } + + private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) { + String recordKey = keyGen.getRecordKey(record); + final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum); + return BucketIdentifier.newBucketFileIdPrefix(bucketId); + } + + public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) { + final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum); + return GenericRowData.of(StringData.fromString(fileId), record); + } + + public static RowType rowTypeWithFileId(RowType rowType) { + LogicalType[] types = new LogicalType[] {DataTypes.STRING().getLogicalType(), rowType}; + String[] names = new String[] {FILE_GROUP_META_FIELD, "record"}; + return RowType.of(types, names); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java index 013595a9d236a..4e1d189b5510f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java @@ -57,9 +57,9 @@ public class BulkInsertWriterHelper { protected final HoodieTable hoodieTable; protected final HoodieWriteConfig writeConfig; protected final RowType rowType; - private final Boolean arePartitionRecordsSorted; + protected final Boolean isInputSorted; private final List writeStatusList = new ArrayList<>(); - private HoodieRowDataCreateHandle handle; + protected HoodieRowDataCreateHandle handle; private String lastKnownPartitionPath = null; private final String fileIdPrefix; private int numFilesWritten = 0; @@ -75,7 +75,7 @@ public BulkInsertWriterHelper(Configuration conf, HoodieTable hoodieTable, Hoodi this.taskId = taskId; this.taskEpochId = taskEpochId; this.rowType = addMetadataFields(rowType, writeConfig.allowOperationMetadataField()); // patch up with metadata fields - this.arePartitionRecordsSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION); + this.isInputSorted = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT); this.fileIdPrefix = UUID.randomUUID().toString(); this.keyGen = RowDataKeyGen.instance(conf, rowType); } @@ -112,7 +112,7 @@ public List getHoodieWriteStatuses() throws IOExcepti private HoodieRowDataCreateHandle getRowCreateHandle(String partitionPath) throws IOException { if (!handles.containsKey(partitionPath)) { // if there is no handle corresponding to the partition path // if records are sorted, we can close all existing handles - if (arePartitionRecordsSorted) { + if (isInputSorted) { close(); } HoodieRowDataCreateHandle rowCreateHandle = new HoodieRowDataCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 4930dbe0b3937..1992eddd63381 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -26,6 +26,7 @@ import org.apache.hudi.sink.append.AppendWriteOperator; import org.apache.hudi.sink.bootstrap.BootstrapOperator; import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator; +import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper; import org.apache.hudi.sink.bucket.BucketStreamWriteOperator; import org.apache.hudi.sink.bulk.BulkInsertWriteOperator; import org.apache.hudi.sink.bulk.RowDataKeyGen; @@ -53,6 +54,7 @@ import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; /** @@ -92,7 +94,18 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); - return dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey) + RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType); + InternalTypeInfo typeInfo = InternalTypeInfo.of(rowTypeWithFileId); + dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey) + .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum), + typeInfo); + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { + SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); + dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator()); + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + return dataStream .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory) .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) @@ -103,7 +116,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); if (partitionFields.length > 0) { RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); - if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) { + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) { // shuffle by partition keys // use #partitionCustom instead of #keyBy to avoid duplicate sort operations, @@ -112,7 +125,7 @@ public static DataStreamSink bulkInsert(Configuration conf, RowType rowT KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels); dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath); } - if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) { + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields); // sort by partition keys dataStream = dataStream diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 1ef7157ab6dd9..8802cac7b6e1c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -916,7 +916,7 @@ void testBulkInsert(String indexType, boolean hiveStylePartitioning) { String hoodieTableDDL = sql("hoodie_sink") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.OPERATION, "bulk_insert") - .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION, true) + .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true) .option(FlinkOptions.INDEX_TYPE, indexType) .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning) .end();