diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 8fbd1be247217..357cf1b3bedee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -53,11 +53,11 @@ public class HoodieCreateHandle extends private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); - private final HoodieFileWriter fileWriter; - private final Path path; - private long recordsWritten = 0; - private long insertRecordsWritten = 0; - private long recordsDeleted = 0; + protected final HoodieFileWriter fileWriter; + protected final Path path; + protected long recordsWritten = 0; + protected long insertRecordsWritten = 0; + protected long recordsDeleted = 0; private Map> recordMap; private boolean useWriterSchema = false; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d5df2cb226665..0716050b7b02e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -95,10 +95,10 @@ public class HoodieMergeHandle extends H protected HoodieFileWriter fileWriter; protected Path newFilePath; - private Path oldFilePath; + protected Path oldFilePath; protected long recordsWritten = 0; - private long recordsDeleted = 0; - private long updatedRecordsWritten = 0; + protected long recordsDeleted = 0; + protected long updatedRecordsWritten = 0; protected long insertRecordsWritten = 0; protected boolean useWriterSchema; private HoodieBaseFile baseFileToMerge; @@ -132,6 +132,13 @@ public Schema getWriterSchema() { return writerSchema; } + /** + * Returns the data file name. + */ + protected String generatesDataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + } + /** * Extract old file path, initialize StorageWriter and WriteStatus. */ @@ -149,7 +156,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo partitionMetadata.trySave(getPartitionId()); oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); - String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String newFileName = generatesDataFileName(); String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + newFileName).toString(); newFilePath = new Path(config.getBasePath(), relativePath); @@ -177,18 +184,25 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo } /** - * Load the new incoming records in a map and return partitionPath. + * Initialize a spillable map for incoming records. */ - private void init(String fileId, Iterator> newRecordsItr) { + protected void initializeIncomingRecordsMap() { try { // Load the new records in a map long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps()); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), - new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } + } + + /** + * Load the new incoming records in a map and return partitionPath. + */ + protected void init(String fileId, Iterator> newRecordsItr) { + initializeIncomingRecordsMap(); while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); // update the new location of the record, so we know where to find it next diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 0c87f7df9f308..9a7f5e8e4acbf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; @@ -39,6 +40,10 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.io.FlinkCreateHandle; +import org.apache.hudi.io.FlinkMergeHandle; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.io.MiniBatchHandle; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; @@ -50,6 +55,7 @@ import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -58,12 +64,19 @@ public class HoodieFlinkWriteClient extends AbstractHoodieWriteClient>, List, List> { + /** + * FileID to write handle mapping in order to record the write handles for each file group, + * so that we can append the mini-batch data buffer incrementally. + */ + private Map> bucketToHandles; + public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + this(context, clientConfig, false); } public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) { super(context, writeConfig, rollbackPending); + this.bucketToHandles = new HashMap<>(); } public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, @@ -111,7 +124,23 @@ public List upsert(List> records, String instantTim getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT); - HoodieWriteMetadata> result = table.upsert(context, instantTime, records); + final HoodieRecord record = records.get(0); + final HoodieRecordLocation loc = record.getCurrentLocation(); + final String fileID = loc.getFileId(); + final boolean isInsert = loc.getInstantTime().equals("I"); + final HoodieWriteHandle writeHandle; + if (bucketToHandles.containsKey(fileID)) { + writeHandle = bucketToHandles.get(fileID); + } else { + // create the write handle if not exists + writeHandle = isInsert + ? new FlinkCreateHandle<>(getConfig(), instantTime, table, record.getPartitionPath(), + fileID, table.getTaskContextSupplier()) + : new FlinkMergeHandle<>(getConfig(), instantTime, table, records.listIterator(), record.getPartitionPath(), + fileID, table.getTaskContextSupplier()); + bucketToHandles.put(fileID, writeHandle); + } + HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsert(context, writeHandle, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); } @@ -202,6 +231,17 @@ protected HoodieTable>, List, List { + ((MiniBatchHandle) handle).finishWrite(); + }); + this.bucketToHandles.clear(); + } + private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { if (operationType == WriteOperationType.DELETE) { setWriteSchemaForDeletes(metaClient); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java similarity index 78% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java index d65663e639e3d..f2847e22858f8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java @@ -24,18 +24,21 @@ import org.apache.hudi.table.HoodieTable; /** - * Create handle factory for Flink writer, use the specified fileID directly - * because it is unique anyway. + * Create handle factory for Flink writer, use the specified write handle directly. */ -public class FlinkCreateHandleFactory +public class ExplicitCreateHandleFactory extends CreateHandleFactory { + private HoodieWriteHandle writeHandle; + + public ExplicitCreateHandleFactory(HoodieWriteHandle writeHandle) { + this.writeHandle = writeHandle; + } @Override public HoodieWriteHandle create( HoodieWriteConfig hoodieConfig, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier) { - return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, - fileIdPrefix, taskContextSupplier); + return writeHandle; } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java new file mode 100644 index 0000000000000..07a71969ffa5a --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches). + * + *

For the first mini-batch, it initialize and set up the next file path to write, + * but does not close the file writer until all the mini-batches write finish. Each mini-batch + * data are appended to the same file. + * + * @param Payload type + * @param Input type + * @param Key type + * @param Output type + */ +public class FlinkCreateHandle + extends HoodieCreateHandle implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class); + private long lastFileSize = 0L; + + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config), + taskContextSupplier); + } + + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Pair writerSchemaIncludingAndExcludingMetadataPair, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair, + taskContextSupplier); + } + + /** + * Called by the compactor code path. + */ + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Map> recordMap, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier); + } + + /** + * Get the incremental write status. In mini-batch write mode, + * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches), + * thus, after a mini-batch append finish, we do not close the underneath writer but return + * the incremental WriteStatus instead. + * + * @return the incremental write status + */ + private WriteStatus getIncrementalWriteStatus() { + try { + long fileSizeInBytes = FSUtils.getFileSize(fs, path); + setUpWriteStatus(fileSizeInBytes); + // reset the write status + recordsWritten = 0; + recordsDeleted = 0; + insertRecordsWritten = 0; + this.lastFileSize = fileSizeInBytes; + writeStatus.setTotalErrorRecords(0); + return writeStatus; + } catch (IOException e) { + throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); + } + } + + /** + * Set up the write status. + * + * @param fileSizeInBytes File size in bytes + * @throws IOException if error occurs + */ + private void setUpWriteStatus(long fileSizeInBytes) throws IOException { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(writeStatus.getPartitionPath()); + stat.setNumWrites(recordsWritten); + stat.setNumDeletes(recordsDeleted); + stat.setNumInserts(insertRecordsWritten); + stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); + stat.setFileId(writeStatus.getFileId()); + stat.setPath(new Path(config.getBasePath()), path); + stat.setTotalWriteBytes(fileSizeInBytes - lastFileSize); + stat.setFileSizeInBytes(fileSizeInBytes); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalCreateTime(timer.endTimer()); + stat.setRuntimeStats(runtimeStats); + timer = new HoodieTimer().startTimer(); + writeStatus.setStat(stat); + } + + public void finishWrite() { + LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); + try { + fileWriter.close(); + } catch (IOException e) { + throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); + } + } + + /** + * Performs actions to durably, persist the current changes and returns a WriteStatus object. + */ + @Override + public List close() { + return Collections.singletonList(getIncrementalWriteStatus()); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java new file mode 100644 index 0000000000000..cfd17295b0971 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers). + * + *

For a new data buffer, it initialize and set up the next file path to write, + * and closes the file path when the data buffer write finish. When next data buffer + * write starts, it rolls over to another new file. If all the data buffers write finish + * for a checkpoint round, it renames the last new file path as the desired file name + * (name with the expected file ID). + * + * @param Payload type + * @param Input type + * @param Key type + * @param Output type + */ +public class FlinkMergeHandle + extends HoodieMergeHandle + implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class); + + /** + * Records the current file handles number that rolls over. + */ + private int rollNumber = 0; + /** + * Records the rolled over file paths. + */ + private List rolloverPaths; + /** + * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet. + */ + private boolean needBootStrap = true; + + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + rolloverPaths = new ArrayList<>(); + } + + /** + * Called by compactor code path. + */ + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Map> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier); + } + + /** + * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + */ + protected String generatesDataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension()); + } + + public boolean isNeedBootStrap() { + return needBootStrap; + } + + @Override + public List close() { + List writeStatus = super.close(); + this.needBootStrap = false; + return writeStatus; + } + + /** + * The difference with the parent method is that there is no need to set up + * locations for the records. + * + * @param fileId The file ID + * @param newRecordsItr The incremental records iterator + */ + @Override + protected void init(String fileId, Iterator> newRecordsItr) { + initializeIncomingRecordsMap(); + while (newRecordsItr.hasNext()) { + HoodieRecord record = newRecordsItr.next(); + // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist + keyToNewRecords.put(record.getRecordKey(), record); + } + LOG.info(String.format("Number of entries in MemoryBasedMap => %d\n" + + "Total size in bytes of MemoryBasedMap => %d\n" + + "Number of entries in DiskBasedMap => %d\n" + + "Size of file spilled to disk => %d", + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(), + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes())); + } + + /** + * + * Rolls over the write handle to prepare for the next batch write. + * + *

It tweaks the handle state as following: + * + *

    + *
  • Increment the {@code rollNumber}
  • + *
  • Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned
  • + *
  • Make the last new file path as old
  • + *
  • Initialize the new file path and file writer
  • + *
+ * + * @param newRecordsItr The records iterator to update + */ + public void rollOver(Iterator> newRecordsItr) { + init(this.fileId, newRecordsItr); + this.recordsWritten = 0; + this.recordsDeleted = 0; + this.updatedRecordsWritten = 0; + this.insertRecordsWritten = 0; + this.writeStatus.setTotalErrorRecords(0); + this.timer = new HoodieTimer().startTimer(); + + rollNumber++; + + rolloverPaths.add(newFilePath); + oldFilePath = newFilePath; + // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + String newFileName = generatesDataFileName(); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + newFilePath = new Path(config.getBasePath(), relativePath); + + try { + fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); + } catch (IOException e) { + throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e); + } + + LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), + newFilePath.toString())); + } + + public void finishWrite() { + for (int i = 0; i < rolloverPaths.size() - 1; i++) { + Path path = rolloverPaths.get(i); + try { + fs.delete(path, false); + } catch (IOException e) { + throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); + } + } + Path lastPath = rolloverPaths.size() > 0 + ? rolloverPaths.get(rolloverPaths.size() - 1) + : newFilePath; + String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + final Path desiredPath = new Path(config.getBasePath(), relativePath); + try { + fs.rename(lastPath, desiredPath); + } catch (IOException e) { + throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java new file mode 100644 index 0000000000000..2cae807a9ce3b --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +/** + * Hoodie write handle that supports write as mini-batch. + */ +public interface MiniBatchHandle { + /** + * Finish the write of multiple mini-batches. Usually these mini-bathes + * come from a checkpoint interval. + */ + void finishWrite(); +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java new file mode 100644 index 0000000000000..c0699ff8e2813 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +/** + * HoodieTable that need to pass in the + * {@link org.apache.hudi.io.HoodieWriteHandle} explicitly. + */ +public interface ExplicitWriteHandleTable { + /** + * Upsert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> upsert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records); + + /** + * Insert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> insert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records); + + /** + * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be + * de-duped and non existent keys will be removed before deleting. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> delete( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List keys); + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> upsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords); + + /** + * Inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> insertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords); +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 4d7edd79d7baa..ddc3fbe277e79 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor; @@ -61,14 +62,117 @@ protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineCont super(config, context, metaClient); } + /** + * Upsert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> upsert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records) { + return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute(); + } + + /** + * Insert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> insert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records) { + return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute(); + } + + /** + * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be + * de-duped and non existent keys will be removed before deleting. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> delete( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List keys) { + return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute(); + } + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> upsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords) { + return new FlinkUpsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + + /** + * Inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> insertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords) { + return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, List> records) { - return new FlinkUpsertCommitActionExecutor<>(context, config, this, instantTime, records).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override public HoodieWriteMetadata> insert(HoodieEngineContext context, String instantTime, List> records) { - return new FlinkInsertCommitActionExecutor<>(context, config, this, instantTime, records).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override @@ -81,7 +185,7 @@ public HoodieWriteMetadata> bulkInsert(HoodieEngineContext con @Override public HoodieWriteMetadata> delete(HoodieEngineContext context, String instantTime, List keys) { - return new FlinkDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override @@ -91,12 +195,12 @@ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, List> preppedRecords) { - return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override public HoodieWriteMetadata> insertPrepped(HoodieEngineContext context, String instantTime, List> preppedRecords) { - return new FlinkInsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index a8e6ed1e077ad..6b7c4a69aee73 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -36,7 +36,8 @@ import java.util.List; public abstract class HoodieFlinkTable - extends HoodieTable>, List, List> { + extends HoodieTable>, List, List> + implements ExplicitWriteHandleTable { protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 044f841d27615..9b6dcd6335c7c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -32,11 +32,14 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; -import org.apache.hudi.io.FlinkCreateHandleFactory; +import org.apache.hudi.io.ExplicitCreateHandleFactory; +import org.apache.hudi.io.FlinkMergeHandle; +import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -71,21 +74,26 @@ public abstract class BaseFlinkCommitActionExecutor writeHandle; + public BaseFlinkCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) { - super(context, config, table, instantTime, operationType, Option.empty()); + this(context, writeHandle, config, table, instantTime, operationType, Option.empty()); } public BaseFlinkCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option extraMetadata) { super(context, config, table, instantTime, operationType, extraMetadata); + this.writeHandle = writeHandle; } @Override @@ -182,6 +190,16 @@ protected Iterator> handleUpsertPartition( case INSERT: return handleInsert(fileIdHint, recordItr); case UPDATE: + if (this.writeHandle instanceof HoodieCreateHandle) { + // During one checkpoint interval, an insert record could also be updated, + // for example, for an operation sequence of a record: + // I, U, | U, U + // - batch1 - | - batch2 - + // the first batch(batch1) operation triggers an INSERT bucket, + // the second batch batch2 tries to reuse the same bucket + // and append instead of UPDATE. + return handleInsert(fileIdHint, recordItr); + } return handleUpdate(partitionPath, fileIdHint, recordItr); default: throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath); @@ -203,7 +221,7 @@ public Iterator> handleUpdate(String partitionPath, String fil return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr); + HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr); return handleUpdateInternal(upsertHandle, fileId); } @@ -225,11 +243,16 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { + protected FlinkMergeHandle getUpdateHandle(Iterator> recordItr) { if (table.requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); + throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write"); } else { - return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); + FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle; + // add the incremental records. + if (!writeHandle.isNeedBootStrap()) { + writeHandle.rollOver(recordItr); + } + return writeHandle; } } @@ -242,6 +265,6 @@ public Iterator> handleInsert(String idPfx, Iterator) Collections.EMPTY_LIST).iterator(); } return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx, - taskContextSupplier, new FlinkCreateHandleFactory<>()); + taskContextSupplier, new ExplicitCreateHandleFactory<>(writeHandle)); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java index 4b46f7fe7d6fa..2064be3e67113 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -33,9 +34,12 @@ public class FlinkDeleteCommitActionExecutor> e private final List keys; public FlinkDeleteCommitActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, HoodieTable table, - String instantTime, List keys) { - super(context, config, table, instantTime, WriteOperationType.DELETE); + HoodieWriteHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List keys) { + super(context, writeHandle, config, table, instantTime, WriteOperationType.DELETE); this.keys = keys; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java index e0c47db26e484..041598314f343 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,11 +35,12 @@ public class FlinkInsertCommitActionExecutor> e private List> inputRecords; public FlinkInsertCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> inputRecords) { - super(context, config, table, instantTime, WriteOperationType.INSERT); + super(context, writeHandle, config, table, instantTime, WriteOperationType.INSERT); this.inputRecords = inputRecords; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java index 8e535475a465b..459a6dbc8f672 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,9 +35,10 @@ public class FlinkInsertPreppedCommitActionExecutor> preppedRecords; public FlinkInsertPreppedCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> preppedRecords) { - super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED); + super(context, writeHandle, config, table, instantTime, WriteOperationType.INSERT_PREPPED); this.preppedRecords = preppedRecords; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index d34aca22f049c..539f551c92585 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -64,13 +65,15 @@ public void runMerge(HoodieTable>, List, List HoodieMergeHandle>, List, List> upsertHandle) throws IOException { final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); - HoodieMergeHandle>, List, List> mergeHandle = upsertHandle; + FlinkMergeHandle>, List, List> mergeHandle = + (FlinkMergeHandle>, List, List>) upsertHandle; HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); final GenericDatumWriter gWriter; final GenericDatumReader gReader; Schema readSchema; - if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { + if (mergeHandle.isNeedBootStrap() + && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) { readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields()); @@ -84,7 +87,7 @@ public void runMerge(HoodieTable>, List, List HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; - if (baseFile.getBootstrapBaseFile().isPresent()) { + if (mergeHandle.isNeedBootStrap() && baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { readerIterator = reader.getRecordIterator(readSchema); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java index 7842e259f6473..5859bb585fcd9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,11 +35,12 @@ public class FlinkUpsertCommitActionExecutor> e private List> inputRecords; public FlinkUpsertCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> inputRecords) { - super(context, config, table, instantTime, WriteOperationType.UPSERT); + super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT); this.inputRecords = inputRecords; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java index a6ecd93199ec2..42d932a2bd7f9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,9 +35,10 @@ public class FlinkUpsertPreppedCommitActionExecutor> preppedRecords; public FlinkUpsertPreppedCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> preppedRecords) { - super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); + super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); this.preppedRecords = preppedRecords; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java index 655fd1aeff6f2..f163b02f6a4bd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java @@ -159,6 +159,12 @@ private FlinkOptions() { .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); + public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions + .key("write.batch.size.MB") + .doubleType() + .defaultValue(128D) // 128MB + .withDescription("Batch buffer size in MB to flush data into the underneath filesystem"); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java index 4309bb008b5a3..d7c0256b7feea 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -50,7 +51,9 @@ /** * A {@link KeyedProcessFunction} where the write operations really happens. */ -public class KeyedWriteProcessFunction extends KeyedProcessFunction, Integer>> implements CheckpointedFunction { +public class KeyedWriteProcessFunction + extends KeyedProcessFunction, Integer>> + implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class); /** @@ -160,6 +163,11 @@ public void processElement(HoodieRecord hoodieRecord, Context context, Collector putDataIntoBuffer(hoodieRecord); } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + this.writeClient.cleanHandles(); + } + public boolean hasRecordsIn() { return hasRecordsIn; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java index 58770982418a8..ba6cea5614efa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; import org.apache.hudi.table.action.commit.FlinkWriteHelper; @@ -33,6 +34,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -48,6 +50,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; @@ -58,33 +61,39 @@ *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully, + * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * - *

Exactly-once Semantics

+ *

The Semantics

* *

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. - * The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer. - * When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint. - * Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics. + * + *

In order to improve the throughput, The function process thread does not block data buffering + * after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint + * batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records + * (e.g. the eager write batch), the semantics is still correct using the UPSERT operation. * *

Fault Tolerance

* - *

The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back - * the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover. - * The operator coordinator would try several times when committing the writestatus. + *

The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully. + * The operator rolls back the written data and throws to trigger a failover when any error occurs. + * This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped). + * If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last + * step of the #flushBuffer method). + * + *

The operator coordinator would try several times when committing the write status. * - *

Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks - * write to the same file group that conflict. The general case for partition path is a datetime field, - * so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the - * data by the file group IDs. + *

Note: The function task requires the input stream be shuffled by the file IDs. * * @param Type of the input record * @see StreamWriteOperatorCoordinator */ -public class StreamWriteFunction extends KeyedProcessFunction implements CheckpointedFunction { +public class StreamWriteFunction + extends KeyedProcessFunction + implements CheckpointedFunction, CheckpointListener { private static final long serialVersionUID = 1L; @@ -137,10 +146,15 @@ public class StreamWriteFunction extends KeyedProcessFunction */ private transient OperatorEventGateway eventGateway; + /** + * The detector that tells if to flush the data as mini-batch. + */ + private transient BufferSizeDetector detector; + /** * Constructs a StreamingSinkFunction. * - * @param config The config options + * @param config The config options */ public StreamWriteFunction(Configuration config) { this.config = config; @@ -149,6 +163,7 @@ public StreamWriteFunction(Configuration config) { @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)); initBuffer(); initWriteClient(); initWriteFunction(); @@ -166,11 +181,8 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { // Based on the fact that the coordinator starts the checkpoint first, // it would check the validity. this.onCheckpointing = true; - this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); - Preconditions.checkNotNull(this.currentInstant, - "No inflight instant when flushing data"); // wait for the buffer data flush out and request a new instant - flushBuffer(); + flushBuffer(true); // signal the task thread to start buffering addToBufferCondition.signal(); } finally { @@ -186,6 +198,7 @@ public void processElement(I value, KeyedProcessFunction.Context ctx, C if (onCheckpointing) { addToBufferCondition.await(); } + flushBufferOnCondition(value); putDataIntoBuffer(value); } finally { bufferLock.unlock(); @@ -199,6 +212,11 @@ public void close() { } } + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.writeClient.cleanHandles(); + } + // ------------------------------------------------------------------------- // Getter/Setter // ------------------------------------------------------------------------- @@ -252,6 +270,42 @@ private void initWriteFunction() { } } + /** + * Tool to detect if to flush out the existing buffer. + * Sampling the record to compute the size with 0.01 percentage. + */ + private static class BufferSizeDetector { + private final Random random = new Random(47); + private static final int DENOMINATOR = 100; + + private final double batchSizeBytes; + + private long lastRecordSize = -1L; + private long totalSize = 0L; + + BufferSizeDetector(double batchSizeMb) { + this.batchSizeBytes = batchSizeMb * 1024 * 1024; + } + + boolean detect(Object record) { + if (lastRecordSize == -1 || sampling()) { + lastRecordSize = ObjectSizeCalculator.getObjectSize(record); + } + totalSize += lastRecordSize; + return totalSize > this.batchSizeBytes; + } + + boolean sampling() { + // 0.01 sampling percentage + return random.nextInt(DENOMINATOR) == 1; + } + + void reset() { + this.lastRecordSize = -1L; + this.totalSize = 0L; + } + } + private void putDataIntoBuffer(I value) { HoodieRecord record = (HoodieRecord) value; final String fileId = record.getCurrentLocation().getFileId(); @@ -262,8 +316,25 @@ private void putDataIntoBuffer(I value) { this.buffer.get(key).add(record); } + /** + * Flush the data buffer if the buffer size is greater than + * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. + * + * @param value HoodieRecord + */ + private void flushBufferOnCondition(I value) { + boolean needFlush = this.detector.detect(value); + if (needFlush) { + flushBuffer(false); + this.detector.reset(); + } + } + @SuppressWarnings("unchecked, rawtypes") - private void flushBuffer() { + private void flushBuffer(boolean isFinalBatch) { + this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); + Preconditions.checkNotNull(this.currentInstant, + "No inflight instant when flushing data"); final List writeStatus; if (buffer.size() > 0) { writeStatus = new ArrayList<>(); @@ -278,12 +349,13 @@ private void flushBuffer() { writeStatus.addAll(writeFunction.apply(records, currentInstant)); } }); - this.buffer.clear(); } else { LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); } - this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus)); + this.eventGateway.sendEventToCoordinator( + new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch)); + this.buffer.clear(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index bf0cfc27e91e5..bd933c212f0fa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -80,12 +80,15 @@ public class StreamWriteOperatorCoordinator */ private transient HoodieFlinkWriteClient writeClient; + /** + * Current data buffering checkpoint. + */ private long inFlightCheckpoint = -1; /** * Current REQUESTED instant, for validation. */ - private String inFlightInstant = ""; + private String instant = ""; /** * Event buffer for one round of checkpointing. When all the elements are non-null and have the same @@ -119,6 +122,8 @@ public void start() throws Exception { initWriteClient(); // init table, create it if not exists. initTableIfNotExists(this.conf); + // start a new instant + startInstant(); } @Override @@ -132,20 +137,14 @@ public void close() { @Override public void checkpointCoordinator(long checkpointId, CompletableFuture result) { try { - final String errMsg = "A new checkpoint starts while the last checkpoint buffer" - + " data has not finish writing, roll back the last write and throw"; - checkAndForceCommit(errMsg); - this.inFlightInstant = this.writeClient.startCommit(); - this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); this.inFlightCheckpoint = checkpointId; - LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId); result.complete(writeCheckpointBytes()); } catch (Throwable throwable) { // when a checkpoint fails, throws directly. result.completeExceptionally( new CompletionException( String.format("Failed to checkpoint Instant %s for source %s", - this.inFlightInstant, this.getClass().getSimpleName()), throwable)); + this.instant, this.getClass().getSimpleName()), throwable)); } } @@ -153,6 +152,15 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r public void checkpointComplete(long checkpointId) { // start to commit the instant. checkAndCommitWithRetry(); + // start new instant. + startInstant(); + } + + private void startInstant() { + this.instant = this.writeClient.startCommit(); + this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant); + LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, + this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } public void notifyCheckpointAborted(long checkpointId) { @@ -175,10 +183,14 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent, "The coordinator can only handle BatchWriteSuccessEvent"); BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant), + Preconditions.checkState(event.getInstantTime().equals(this.instant), String.format("Receive an unexpected event for instant %s from task %d", event.getInstantTime(), event.getTaskID())); - this.eventBuffer[event.getTaskID()] = event; + if (this.eventBuffer[event.getTaskID()] != null) { + this.eventBuffer[event.getTaskID()].mergeWith(event); + } else { + this.eventBuffer[event.getTaskID()] = event; + } } @Override @@ -218,7 +230,7 @@ private byte[] writeCheckpointBytes() throws IOException { DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { out.writeLong(this.inFlightCheckpoint); - byte[] serializedInstant = this.inFlightInstant.getBytes(); + byte[] serializedInstant = this.instant.getBytes(); out.writeInt(serializedInstant.length); out.write(serializedInstant); out.flush(); @@ -239,12 +251,12 @@ private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception { int serializedInstantSize = in.readInt(); byte[] serializedInstant = readBytes(in, serializedInstantSize); this.inFlightCheckpoint = checkpointID; - this.inFlightInstant = new String(serializedInstant); + this.instant = new String(serializedInstant); } } private void reset() { - this.inFlightInstant = ""; + this.instant = ""; this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; } @@ -253,8 +265,8 @@ private void checkAndForceCommit(String errMsg) { // forced but still has inflight instant String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE)); if (inflightInstant != null) { - assert inflightInstant.equals(this.inFlightInstant); - writeClient.rollback(this.inFlightInstant); + assert inflightInstant.equals(this.instant); + writeClient.rollback(this.instant); throw new HoodieException(errMsg); } if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { @@ -277,6 +289,10 @@ private void checkAndCommitWithRetry() { if (!checkReady()) { // Do not throw if the try times expires but the event buffer are still not ready, // because we have a force check when next checkpoint starts. + if (tryTimes == retryTimes) { + // Throw if the try times expires but the event buffer are still not ready + throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed"); + } sleepFor(retryIntervalMillis); continue; } @@ -284,9 +300,9 @@ private void checkAndCommitWithRetry() { return; } catch (Throwable throwable) { String cause = throwable.getCause() == null ? "" : throwable.getCause().toString(); - LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause); + LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause); if (tryTimes == retryTimes) { - throw new HoodieException(throwable); + throw new HoodieException("Not all write tasks finish the batch write to commit", throwable); } sleepFor(retryIntervalMillis); } @@ -307,8 +323,8 @@ private void sleepFor(long intervalMillis) { /** Checks the buffer is ready to commit. */ private boolean checkReady() { - return Arrays.stream(eventBuffer).allMatch(event -> - event != null && event.getInstantTime().equals(this.inFlightInstant)); + return Arrays.stream(eventBuffer) + .allMatch(event -> event != null && event.isReady(this.instant)); } /** Performs the actual commit action. */ @@ -320,7 +336,7 @@ private void doCommit() { if (writeResults.size() == 0) { // No data has written, clear the metadata file - this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); + this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant); reset(); return; } @@ -337,12 +353,12 @@ private void doCommit() { + totalErrorRecords + "/" + totalRecords); } - boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata)); + boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata)); if (success) { reset(); - LOG.info("Commit instant [{}] success!", this.inFlightInstant); + LOG.info("Commit instant [{}] success!", this.instant); } else { - throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant)); + throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant)); } } else { LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); @@ -355,8 +371,8 @@ private void doCommit() { } }); // Rolls back instant - writeClient.rollback(this.inFlightInstant); - throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant)); + writeClient.rollback(this.instant); + throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant)); } } @@ -366,8 +382,14 @@ public BatchWriteSuccessEvent[] getEventBuffer() { } @VisibleForTesting - public String getInFlightInstant() { - return inFlightInstant; + public String getInstant() { + return instant; + } + + @VisibleForTesting + @SuppressWarnings("rawtypes") + public HoodieFlinkWriteClient getWriteClient() { + return writeClient; } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java index db5432e1e9244..f03e8d3e27f00 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java @@ -21,7 +21,9 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.ValidationUtils; +import java.util.ArrayList; import java.util.List; /** @@ -30,17 +32,38 @@ public class BatchWriteSuccessEvent implements OperatorEvent { private static final long serialVersionUID = 1L; - private final List writeStatuses; + private List writeStatuses; private final int taskID; private final String instantTime; + private boolean isLastBatch; public BatchWriteSuccessEvent( int taskID, String instantTime, List writeStatuses) { + this(taskID, instantTime, writeStatuses, false); + } + + /** + * Creates an event. + * + * @param taskID The task ID + * @param instantTime The instant time under which to write the data + * @param writeStatuses The write statues list + * @param isLastBatch Whether the event reports the last batch + * within an checkpoint interval, + * if true, the whole data set of the checkpoint + * has been flushed successfully + */ + public BatchWriteSuccessEvent( + int taskID, + String instantTime, + List writeStatuses, + boolean isLastBatch) { this.taskID = taskID; this.instantTime = instantTime; - this.writeStatuses = writeStatuses; + this.writeStatuses = new ArrayList<>(writeStatuses); + this.isLastBatch = isLastBatch; } public List getWriteStatuses() { @@ -54,4 +77,28 @@ public int getTaskID() { public String getInstantTime() { return instantTime; } + + public boolean isLastBatch() { + return isLastBatch; + } + + /** + * Merges this event with given {@link BatchWriteSuccessEvent} {@code other}. + * + * @param other The event to be merged + */ + public void mergeWith(BatchWriteSuccessEvent other) { + ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime)); + ValidationUtils.checkArgument(this.taskID == other.taskID); + this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true. + List statusList = new ArrayList<>(); + statusList.addAll(this.writeStatuses); + statusList.addAll(other.writeStatuses); + this.writeStatuses = statusList; + } + + /** Returns whether the event is ready to commit. */ + public boolean isReady(String currentInstant) { + return isLastBatch && this.instantTime.equals(currentInstant); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java index 269ccc801592a..590ee3be84356 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java @@ -89,7 +89,7 @@ public void open(Configuration parameters) throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) { - this.bucketAssigner.reset(); + // no operation } @Override @@ -144,6 +144,7 @@ public void processElement(I value, Context ctx, Collector out) throws Except @Override public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. + this.bucketAssigner.reset(); this.bucketAssigner.refreshTable(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4447705a9012f..1c402849331f7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; @@ -53,6 +54,7 @@ import java.io.IOException; import java.io.StringReader; import java.util.List; +import java.util.Objects; import java.util.Properties; /** @@ -293,4 +295,9 @@ public static void initTableIfNotExists(Configuration conf) throws IOException { public static String generateBucketKey(String partitionPath, String fileId) { return String.format("%s_%s", partitionPath, fileId); } + + /** Returns whether the location represents an insert. */ + public static boolean isInsert(HoodieRecordLocation loc) { + return Objects.equals(loc.getInstantTime(), "I"); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java index fea9b8f92feb0..12a00dd8eaf1b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -150,7 +151,8 @@ public void testCheckpoint() throws Exception { assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); funcWrapper.checkpointComplete(2); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); } @@ -187,12 +189,13 @@ public void testCheckpointFails() throws Exception { funcWrapper.invoke(rowData); } - // this triggers the data write and event send - funcWrapper.checkpointFunction(2); - // Do not sent the write event and fails the checkpoint - assertThrows(HoodieException.class, - () -> funcWrapper.checkpointFails(2), - "The last checkpoint was aborted, roll back the last write and throw"); + // this triggers NPE cause there is no inflight instant + assertThrows(NullPointerException.class, + () -> funcWrapper.checkpointFunction(2), + "No inflight instant when flushing data"); + // do not sent the write event and fails the checkpoint, + // behaves like the last checkpoint is successful. + funcWrapper.checkpointFails(2); } @Test @@ -212,13 +215,13 @@ public void testInsert() throws Exception { final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED1); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); funcWrapper.checkpointComplete(1); + checkWrittenData(tempFile, EXPECTED1); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED1); @@ -241,15 +244,16 @@ public void testInsertDuplicates() throws Exception { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED3, 1); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); funcWrapper.checkpointComplete(1); + checkWrittenData(tempFile, EXPECTED3, 1); + // insert duplicates again for (RowData rowData : TestData.DATA_SET_THREE) { funcWrapper.invoke(rowData); @@ -257,6 +261,10 @@ public void testInsertDuplicates() throws Exception { funcWrapper.checkpointFunction(2); + nextEvent = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + funcWrapper.checkpointComplete(2); + checkWrittenData(tempFile, EXPECTED3, 1); } @@ -306,10 +314,84 @@ public void testUpsert() throws Exception { checkWrittenData(tempFile, EXPECTED2); } + @Test + public void testInsertWithMiniBatches() throws Exception { + // reset the config option + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // Each record is 424 bytes. so 3 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("2 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(3)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + + funcWrapper.checkpointComplete(1); + + Map expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1]"); + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + final OperatorEvent event5 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.getCoordinator().handleEventFromOperator(0, event5); + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- + @SuppressWarnings("rawtypes") + private void checkInflightInstant(HoodieFlinkWriteClient writeClient) { + final String instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE"); + assertNotNull(instant); + } + @SuppressWarnings("rawtypes") private void checkInstantState( HoodieFlinkWriteClient writeClient, diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java index c533b48e5a296..732cc65be0582 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java @@ -18,8 +18,11 @@ package org.apache.hudi.operator; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.util.StreamerUtil; @@ -37,6 +40,9 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -61,6 +67,34 @@ public void after() { coordinator.close(); } + @Test + void testInstantState() { + String instant = coordinator.getInstant(); + assertNotEquals("", instant); + + WriteStatus writeStatus = new WriteStatus(true, 0.1D); + writeStatus.setPartitionPath("par1"); + writeStatus.setStat(new HoodieWriteStat()); + OperatorEvent event0 = + new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true); + + WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); + writeStatus1.setPartitionPath("par2"); + writeStatus1.setStat(new HoodieWriteStat()); + OperatorEvent event1 = + new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true); + coordinator.handleEventFromOperator(0, event0); + coordinator.handleEventFromOperator(1, event1); + + coordinator.checkpointComplete(1); + String inflight = coordinator.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE"); + assertThat("Instant should be complete", lastCompleted, is(instant)); + assertNotEquals("", inflight, "Should start a new instant"); + assertNotEquals(instant, inflight, "Should start a new instant"); + } + @Test public void testTableInitialized() throws IOException { final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); @@ -88,14 +122,14 @@ public void testReceiveInvalidEvent() { } @Test - public void testCheckpointInvalid() { + public void testCheckpointCompleteWithRetry() { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - String inflightInstant = coordinator.getInFlightInstant(); + String inflightInstant = coordinator.getInstant(); OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList()); coordinator.handleEventFromOperator(0, event); - final CompletableFuture future2 = new CompletableFuture<>(); - coordinator.checkpointCoordinator(2, future2); - assertTrue(future2.isCompletedExceptionally()); + assertThrows(HoodieException.class, + () -> coordinator.checkpointComplete(1), + "Try 3 to commit instant"); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java index 59de2832664c0..6f7062c6fc019 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -38,6 +38,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -77,11 +79,11 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E this.conf = conf; // one function this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); - this.coordinator.start(); this.functionInitializationContext = new MockFunctionInitializationContext(); } public void openFunction() throws Exception { + this.coordinator.start(); toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); @@ -123,6 +125,10 @@ public OperatorEvent getNextEvent() { return this.gateway.getNextEvent(); } + public Map> getDataBuffer() { + return this.writeFunction.getBuffer(); + } + @SuppressWarnings("rawtypes") public HoodieFlinkWriteClient getWriteClient() { return this.writeFunction.getWriteClient(); @@ -141,6 +147,7 @@ public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.checkpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); + this.writeFunction.notifyCheckpointComplete(checkpointId); } public void checkpointFails(long checkpointId) {