diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java new file mode 100644 index 0000000000000..46eff587575cc --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; +import org.apache.hudi.io.HoodieWriteHandle; + +import java.util.ArrayList; +import java.util.List; + +/** + * Consumes stream of hoodie records from in-memory queue and writes to one explicit create handle. + */ +public class ExplicitWriteHandler + extends BoundedInMemoryQueueConsumer, List> { + + private final List statuses = new ArrayList<>(); + + private HoodieWriteHandle handle; + + public ExplicitWriteHandler(HoodieWriteHandle handle) { + this.handle = handle; + } + + @Override + public void consumeOneRecord(HoodieLazyInsertIterable.HoodieInsertValueGenResult payload) { + final HoodieRecord insertPayload = payload.record; + handle.write(insertPayload, payload.insertValue, payload.exception); + } + + @Override + public void finish() { + closeOpenHandle(); + assert statuses.size() > 0; + } + + @Override + public List getResult() { + return statuses; + } + + private void closeOpenHandle() { + statuses.addAll(handle.close()); + } +} + diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index b0674b2a134d0..78b3cb1dc61f7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -27,7 +27,8 @@ import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.WriteHandleFactory; +import org.apache.hudi.io.ExplicitWriteHandleFactory; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; @@ -36,15 +37,6 @@ import java.util.List; public class FlinkLazyInsertIterable extends HoodieLazyInsertIterable { - public FlinkLazyInsertIterable(Iterator> recordItr, - boolean areRecordsSorted, - HoodieWriteConfig config, - String instantTime, - HoodieTable hoodieTable, - String idPrefix, - TaskContextSupplier taskContextSupplier) { - super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier); - } public FlinkLazyInsertIterable(Iterator> recordItr, boolean areRecordsSorted, @@ -53,7 +45,7 @@ public FlinkLazyInsertIterable(Iterator> recordItr, HoodieTable hoodieTable, String idPrefix, TaskContextSupplier taskContextSupplier, - WriteHandleFactory writeHandleFactory) { + ExplicitWriteHandleFactory writeHandleFactory) { super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory); } @@ -64,8 +56,8 @@ protected List computeNext() { null; try { final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema()); - bufferedIteratorExecutor = - new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), Option.of(getInsertHandler()), getTransformFunction(schema, hoodieConfig)); + bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(inputItr), + Option.of(getExplicitInsertHandler()), getTransformFunction(schema, hoodieConfig)); final List result = bufferedIteratorExecutor.execute(); assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining(); return result; @@ -77,4 +69,10 @@ protected List computeNext() { } } } + + @SuppressWarnings("rawtypes") + private ExplicitWriteHandler getExplicitInsertHandler() { + HoodieWriteHandle handle = ((ExplicitWriteHandleFactory) writeHandleFactory).getWriteHandle(); + return new ExplicitWriteHandler(handle); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java index 092e945f0f9d3..e598a033750dd 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitWriteHandleFactory.java @@ -28,7 +28,7 @@ */ public class ExplicitWriteHandleFactory extends WriteHandleFactory { - private HoodieWriteHandle writeHandle; + private final HoodieWriteHandle writeHandle; public ExplicitWriteHandleFactory(HoodieWriteHandle writeHandle) { this.writeHandle = writeHandle; @@ -41,4 +41,8 @@ public HoodieWriteHandle create( String fileIdPrefix, TaskContextSupplier taskContextSupplier) { return writeHandle; } + + public HoodieWriteHandle getWriteHandle() { + return writeHandle; + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 1872637aeefde..b514896aa1e3a 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -74,6 +74,11 @@ protected void createMarkerFile(String partitionPath, String dataFileName) { writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType()); } + @Override + public boolean canWrite(HoodieRecord record) { + return true; + } + @Override protected boolean needsUpdateLocation() { return false;