diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 0cbb192ab9072..008dadd65c84a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -578,12 +578,12 @@ public void cleanHandlesGracefully() { final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ); final HoodieWriteHandle writeHandle; - if (isDelta) { - writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, - table.getTaskContextSupplier()); - } else if (loc.getInstantTime().equals("I")) { + if (loc.getInstantTime().equals("I")) { writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, - fileID, table.getTaskContextSupplier()); + fileID, table.getTaskContextSupplier()); + } else if (isDelta) { + writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, + table.getTaskContextSupplier()); } else { writeHandle = insertClustering ? new FlinkConcatHandle<>(config, instantTime, table, recordItr, partitionPath, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index a65e03da761d3..b0150adaf0dd8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -66,10 +66,7 @@ public HoodieWriteMetadata> upsert( HoodieWriteHandle writeHandle, String instantTime, List> hoodieRecords) { - ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle, - "MOR write handle should always be a FlinkAppendHandle"); - FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; - return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute(); + return new FlinkUpsertDeltaCommitActionExecutor<>(context, writeHandle, config, this, instantTime, hoodieRecords).execute(); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java index 1fe98204b1cb1..42de385ca5e52 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/BaseFlinkDeltaCommitActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.execution.FlinkLazyInsertIterable; import org.apache.hudi.io.ExplicitWriteHandleFactory; import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor; @@ -38,7 +39,7 @@ public abstract class BaseFlinkDeltaCommitActionExecutor { public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context, - FlinkAppendHandle writeHandle, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, @@ -48,10 +49,14 @@ public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context, @Override public Iterator> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) { - FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; - appendHandle.doAppend(); - List writeStatuses = appendHandle.close(); - return Collections.singletonList(writeStatuses).iterator(); + if (writeHandle instanceof FlinkAppendHandle) { + FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle; + appendHandle.doAppend(); + List writeStatuses = appendHandle.close(); + return Collections.singletonList(writeStatuses).iterator(); + } else { + return this.handleInsert(fileId, recordItr); + } } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java index 7053f7a16203f..af51f652bbd79 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertDeltaCommitActionExecutor.java @@ -23,7 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.commit.FlinkWriteHelper; @@ -35,7 +35,7 @@ public class FlinkUpsertDeltaCommitActionExecutor> inputRecords; public FlinkUpsertDeltaCommitActionExecutor(HoodieEngineContext context, - FlinkAppendHandle writeHandle, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java index 493c894c8a96c..57b956c52f500 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/delta/FlinkUpsertPreppedDeltaCommitActionExecutor.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.FlinkAppendHandle; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -36,7 +36,7 @@ public class FlinkUpsertPreppedDeltaCommitActionExecutor> preppedRecords; public FlinkUpsertPreppedDeltaCommitActionExecutor(HoodieEngineContext context, - FlinkAppendHandle writeHandle, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 1cf66ea3437ef..dd93444934de7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -326,8 +326,7 @@ private static void setupReadOptions(Configuration conf) { * Sets up the write options from the table definition. */ private static void setupWriteOptions(Configuration conf) { - if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) - && OptionsResolver.isCowTable(conf)) { + if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)) { conf.setBoolean(FlinkOptions.PRE_COMBINE, true); } }