diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index cb8cb11da51f1..4135cad49745d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -246,7 +246,21 @@ public List bulkInsert(List> records, String instan @Override public List bulkInsertPreppedRecords(List> preppedRecords, String instantTime, Option bulkInsertPartitioner) { - throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet"); + // only used for metadata table, the bulk_insert happens in single JVM process + HoodieTable>, List, List> table = + initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime)); + table.validateInsertSchema(); + preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient()); + Map>> preppedRecordsByFileId = preppedRecords.stream().parallel() + .collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId())); + return preppedRecordsByFileId.values().stream().parallel().map(records -> { + HoodieWriteMetadata> result; + records.get(0).getCurrentLocation().setInstantTime("I"); + try (AutoCloseableWriteHandle closeableHandle = new AutoCloseableWriteHandle(records, instantTime, table, true)) { + result = ((HoodieFlinkTable) table).bulkInsertPrepped(context, closeableHandle.getWriteHandle(), instantTime, records); + } + return postWrite(result, instantTime, table); + }).flatMap(Collection::stream).collect(Collectors.toList()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 9378274a12fef..ac5299c7ba806 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -107,6 +108,17 @@ protected void initRegistry() { @Override protected void commit(String instantTime, Map> partitionRecordsMap) { + doCommit(instantTime, partitionRecordsMap, false); + } + + @Override + protected void bulkCommit(String instantTime, MetadataPartitionType partitionType, HoodieData records, int fileGroupCount) { + Map> partitionRecordsMap = new HashMap<>(); + partitionRecordsMap.put(partitionType, records); + doCommit(instantTime, partitionRecordsMap, true); + } + + private void doCommit(String instantTime, Map> partitionRecordsMap, boolean isInitializing) { ValidationUtils.checkState(metadataMetaClient != null, "Metadata table is not fully initialized yet."); HoodieData preppedRecords = prepRecords(partitionRecordsMap); List preppedRecordList = preppedRecords.collectAsList(); @@ -149,9 +161,9 @@ protected void commit(String instantTime, Map statuses = preppedRecordList.size() > 0 - ? writeClient.upsertPreppedRecords(preppedRecordList, instantTime) - : Collections.emptyList(); + List statuses = isInitializing + ? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, Option.empty()) + : writeClient.upsertPreppedRecords(preppedRecordList, instantTime); // flink does not support auto-commit yet, also the auto commit logic is not complete as BaseHoodieWriteClient now. writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java index 4145c9630cd7f..0e71b852ca08b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java @@ -35,7 +35,7 @@ public interface ExplicitWriteHandleTable { /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext @@ -53,7 +53,7 @@ HoodieWriteMetadata> upsert( /** * Insert a batch of new records into Hoodie table at the supplied instantTime. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext @@ -72,7 +72,7 @@ HoodieWriteMetadata> insert( * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be * de-duped and non existent keys will be removed before deleting. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext @@ -92,12 +92,12 @@ HoodieWriteMetadata> delete( * *

This implementation requires that the input records are already tagged, and de-duped if needed. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext * @param instantTime Instant Time for the action - * @param preppedRecords hoodieRecords to upsert + * @param preppedRecords HoodieRecords to upsert * @return HoodieWriteMetadata */ HoodieWriteMetadata> upsertPrepped( @@ -111,12 +111,12 @@ HoodieWriteMetadata> upsertPrepped( * *

This implementation requires that the input records are already tagged, and de-duped if needed. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext * @param instantTime Instant Time for the action - * @param preppedRecords hoodieRecords to upsert + * @param preppedRecords Hoodie records to insert * @return HoodieWriteMetadata */ HoodieWriteMetadata> insertPrepped( @@ -125,6 +125,25 @@ HoodieWriteMetadata> insertPrepped( String instantTime, List> preppedRecords); + /** + * Bulk inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine-grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords Hoodie records to bulk_insert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> bulkInsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords); + /** * Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime, * for the partition paths contained in input records. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index cb046e2e91fee..e07eff4cf3dca 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -54,6 +54,7 @@ import org.apache.hudi.table.action.clean.CleanActionExecutor; import org.apache.hudi.table.action.clean.CleanPlanActionExecutor; import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor; +import org.apache.hudi.table.action.commit.FlinkBulkInsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor; @@ -94,7 +95,7 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext @@ -114,7 +115,7 @@ public HoodieWriteMetadata> upsert( /** * Insert a batch of new records into Hoodie table at the supplied instantTime. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext @@ -135,7 +136,7 @@ public HoodieWriteMetadata> insert( * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be * de-duped and non existent keys will be removed before deleting. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext @@ -157,12 +158,12 @@ public HoodieWriteMetadata> delete( * *

This implementation requires that the input records are already tagged, and de-duped if needed. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext * @param instantTime Instant Time for the action - * @param preppedRecords hoodieRecords to upsert + * @param preppedRecords Hoodie records to upsert * @return HoodieWriteMetadata */ public HoodieWriteMetadata> upsertPrepped( @@ -178,12 +179,12 @@ public HoodieWriteMetadata> upsertPrepped( * *

This implementation requires that the input records are already tagged, and de-duped if needed. * - *

Specifies the write handle explicitly in order to have fine grained control with + *

Specifies the write handle explicitly in order to have fine-grained control with * the underneath file. * * @param context HoodieEngineContext * @param instantTime Instant Time for the action - * @param preppedRecords hoodieRecords to upsert + * @param preppedRecords Hoodie records to insert * @return HoodieWriteMetadata */ public HoodieWriteMetadata> insertPrepped( @@ -194,6 +195,27 @@ public HoodieWriteMetadata> insertPrepped( return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); } + /** + * Bulk inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine-grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords Hoodie records to bulk_insert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> bulkInsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords) { + return new FlinkBulkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + @Override public HoodieWriteMetadata> insertOverwrite( HoodieEngineContext context, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java new file mode 100644 index 0000000000000..70c3541997061 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkBulkInsertPreppedCommitActionExecutor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +/** + * Flink insert prepped commit action executor. + */ +public class FlinkBulkInsertPreppedCommitActionExecutor extends BaseFlinkCommitActionExecutor { + + private final List> preppedRecords; + + public FlinkBulkInsertPreppedCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, + HoodieWriteConfig config, HoodieTable table, + String instantTime, List> preppedRecords) { + super(context, writeHandle, config, table, instantTime, WriteOperationType.BULK_INSERT_PREPPED); + this.preppedRecords = preppedRecords; + } + + @Override + public HoodieWriteMetadata> execute() { + return super.execute(preppedRecords); + } +}