Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,12 @@ public void cleanHandlesGracefully() {

final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
if (loc.getInstantTime().equals("I")) {
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
fileID, table.getTaskContextSupplier());
} else if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else {
writeHandle = insertClustering
? new FlinkConcatHandle<>(config, instantTime, table, recordItr, partitionPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> hoodieRecords) {
ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
"MOR write handle should always be a FlinkAppendHandle");
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
return new FlinkUpsertDeltaCommitActionExecutor<>(context, writeHandle, config, this, instantTime, hoodieRecords).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.execution.FlinkLazyInsertIterable;
import org.apache.hudi.io.ExplicitWriteHandleFactory;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor;

Expand All @@ -38,7 +39,7 @@ public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordP
extends BaseFlinkCommitActionExecutor<T> {

public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
Expand All @@ -48,10 +49,14 @@ public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context,

@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
appendHandle.doAppend();
List<WriteStatus> writeStatuses = appendHandle.close();
return Collections.singletonList(writeStatuses).iterator();
if (writeHandle instanceof FlinkAppendHandle) {
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
appendHandle.doAppend();
List<WriteStatus> writeStatuses = appendHandle.close();
return Collections.singletonList(writeStatuses).iterator();
} else {
return this.handleInsert(fileId, recordItr);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
Expand All @@ -35,7 +35,7 @@ public class FlinkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<
private final List<HoodieRecord<T>> inputRecords;

public FlinkUpsertDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

Expand All @@ -36,7 +36,7 @@ public class FlinkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordP
private final List<HoodieRecord<T>> preppedRecords;

public FlinkUpsertPreppedDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,7 @@ private static void setupReadOptions(Configuration conf) {
* Sets up the write options from the table definition.
*/
private static void setupWriteOptions(Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)
&& OptionsResolver.isCowTable(conf)) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)) {
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
}
}
Expand Down