From a7d7ee67d9661d5cf74a235894c9cc9e1a449aad Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Mon, 8 Feb 2021 15:29:10 +0800 Subject: [PATCH] [HUDI-1598] Write as minor batches during one checkpoint interval for the new writer This is the #step 3 of RFC-24: https://cwiki.apache.org/confluence/display/HUDI/RFC+-+24%3A+Hoodie+Flink+Writer+Proposal Buffering data during one checkpoint when flush the buffer out all at a time is not resource friendly for streaming write. The more proper way it to cut the batches based on their real memory data buffer size (say, 128Mb), the writer always flushes the buffer out when its size reaches the configured threshold. Thus, after this change, one instant may span one (if every checkpoint succeeds) or more (if there are checkpoint failures) checkpoints. The instant only commits when there is a successful checkpoint. Changes: - Modify the BaseFlinkCommitActionExecutor to keep the write handles explicitly, the write task can then decides when and how to roll over the underneath file handles - The StreamWriteOperatorCoordinator now commits the old instant and creates new one only if the checkpoint succeeds, thus, we can skip some failed checkpoints actually. --- .../apache/hudi/io/HoodieCreateHandle.java | 10 +- .../org/apache/hudi/io/HoodieMergeHandle.java | 28 ++- .../hudi/client/HoodieFlinkWriteClient.java | 44 +++- ....java => ExplicitCreateHandleFactory.java} | 13 +- .../org/apache/hudi/io/FlinkCreateHandle.java | 148 +++++++++++++ .../org/apache/hudi/io/FlinkMergeHandle.java | 202 ++++++++++++++++++ .../org/apache/hudi/io/MiniBatchHandle.java | 30 +++ .../hudi/table/ExplicitWriteHandleTable.java | 128 +++++++++++ .../table/HoodieFlinkCopyOnWriteTable.java | 114 +++++++++- .../apache/hudi/table/HoodieFlinkTable.java | 3 +- .../commit/BaseFlinkCommitActionExecutor.java | 39 +++- .../FlinkDeleteCommitActionExecutor.java | 10 +- .../FlinkInsertCommitActionExecutor.java | 4 +- ...linkInsertPreppedCommitActionExecutor.java | 4 +- .../table/action/commit/FlinkMergeHelper.java | 9 +- .../FlinkUpsertCommitActionExecutor.java | 4 +- ...linkUpsertPreppedCommitActionExecutor.java | 4 +- .../apache/hudi/operator/FlinkOptions.java | 6 + .../operator/KeyedWriteProcessFunction.java | 10 +- .../hudi/operator/StreamWriteFunction.java | 114 ++++++++-- .../StreamWriteOperatorCoordinator.java | 76 ++++--- .../event/BatchWriteSuccessEvent.java | 51 ++++- .../partitioner/BucketAssignFunction.java | 3 +- .../org/apache/hudi/util/StreamerUtil.java | 7 + .../operator/StreamWriteFunctionTest.java | 102 ++++++++- .../StreamWriteOperatorCoordinatorTest.java | 44 +++- .../utils/StreamWriteFunctionWrapper.java | 9 +- 27 files changed, 1105 insertions(+), 111 deletions(-) rename hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/{FlinkCreateHandleFactory.java => ExplicitCreateHandleFactory.java} (78%) create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 8fbd1be247217..357cf1b3bedee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -53,11 +53,11 @@ public class HoodieCreateHandle extends private static final Logger LOG = LogManager.getLogger(HoodieCreateHandle.class); - private final HoodieFileWriter fileWriter; - private final Path path; - private long recordsWritten = 0; - private long insertRecordsWritten = 0; - private long recordsDeleted = 0; + protected final HoodieFileWriter fileWriter; + protected final Path path; + protected long recordsWritten = 0; + protected long insertRecordsWritten = 0; + protected long recordsDeleted = 0; private Map> recordMap; private boolean useWriterSchema = false; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d5df2cb226665..0716050b7b02e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -95,10 +95,10 @@ public class HoodieMergeHandle extends H protected HoodieFileWriter fileWriter; protected Path newFilePath; - private Path oldFilePath; + protected Path oldFilePath; protected long recordsWritten = 0; - private long recordsDeleted = 0; - private long updatedRecordsWritten = 0; + protected long recordsDeleted = 0; + protected long updatedRecordsWritten = 0; protected long insertRecordsWritten = 0; protected boolean useWriterSchema; private HoodieBaseFile baseFileToMerge; @@ -132,6 +132,13 @@ public Schema getWriterSchema() { return writerSchema; } + /** + * Returns the data file name. + */ + protected String generatesDataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + } + /** * Extract old file path, initialize StorageWriter and WriteStatus. */ @@ -149,7 +156,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo partitionMetadata.trySave(getPartitionId()); oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath); - String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String newFileName = generatesDataFileName(); String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + newFileName).toString(); newFilePath = new Path(config.getBasePath(), relativePath); @@ -177,18 +184,25 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo } /** - * Load the new incoming records in a map and return partitionPath. + * Initialize a spillable map for incoming records. */ - private void init(String fileId, Iterator> newRecordsItr) { + protected void initializeIncomingRecordsMap() { try { // Load the new records in a map long memoryForMerge = IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps()); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), - new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); + new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(writerSchema)); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } + } + + /** + * Load the new incoming records in a map and return partitionPath. + */ + protected void init(String fileId, Iterator> newRecordsItr) { + initializeIncomingRecordsMap(); while (newRecordsItr.hasNext()) { HoodieRecord record = newRecordsItr.next(); // update the new location of the record, so we know where to find it next diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 0c87f7df9f308..9a7f5e8e4acbf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; @@ -39,6 +40,10 @@ import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.io.FlinkCreateHandle; +import org.apache.hudi.io.FlinkMergeHandle; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.io.MiniBatchHandle; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; @@ -50,6 +55,7 @@ import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -58,12 +64,19 @@ public class HoodieFlinkWriteClient extends AbstractHoodieWriteClient>, List, List> { + /** + * FileID to write handle mapping in order to record the write handles for each file group, + * so that we can append the mini-batch data buffer incrementally. + */ + private Map> bucketToHandles; + public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + this(context, clientConfig, false); } public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) { super(context, writeConfig, rollbackPending); + this.bucketToHandles = new HashMap<>(); } public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, @@ -111,7 +124,23 @@ public List upsert(List> records, String instantTim getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT); - HoodieWriteMetadata> result = table.upsert(context, instantTime, records); + final HoodieRecord record = records.get(0); + final HoodieRecordLocation loc = record.getCurrentLocation(); + final String fileID = loc.getFileId(); + final boolean isInsert = loc.getInstantTime().equals("I"); + final HoodieWriteHandle writeHandle; + if (bucketToHandles.containsKey(fileID)) { + writeHandle = bucketToHandles.get(fileID); + } else { + // create the write handle if not exists + writeHandle = isInsert + ? new FlinkCreateHandle<>(getConfig(), instantTime, table, record.getPartitionPath(), + fileID, table.getTaskContextSupplier()) + : new FlinkMergeHandle<>(getConfig(), instantTime, table, records.listIterator(), record.getPartitionPath(), + fileID, table.getTaskContextSupplier()); + bucketToHandles.put(fileID, writeHandle); + } + HoodieWriteMetadata> result = ((HoodieFlinkTable) table).upsert(context, writeHandle, instantTime, records); if (result.getIndexLookupDuration().isPresent()) { metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); } @@ -202,6 +231,17 @@ protected HoodieTable>, List, List { + ((MiniBatchHandle) handle).finishWrite(); + }); + this.bucketToHandles.clear(); + } + private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { if (operationType == WriteOperationType.DELETE) { setWriteSchemaForDeletes(metaClient); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java similarity index 78% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java index d65663e639e3d..f2847e22858f8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandleFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/ExplicitCreateHandleFactory.java @@ -24,18 +24,21 @@ import org.apache.hudi.table.HoodieTable; /** - * Create handle factory for Flink writer, use the specified fileID directly - * because it is unique anyway. + * Create handle factory for Flink writer, use the specified write handle directly. */ -public class FlinkCreateHandleFactory +public class ExplicitCreateHandleFactory extends CreateHandleFactory { + private HoodieWriteHandle writeHandle; + + public ExplicitCreateHandleFactory(HoodieWriteHandle writeHandle) { + this.writeHandle = writeHandle; + } @Override public HoodieWriteHandle create( HoodieWriteConfig hoodieConfig, String commitTime, HoodieTable hoodieTable, String partitionPath, String fileIdPrefix, TaskContextSupplier taskContextSupplier) { - return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath, - fileIdPrefix, taskContextSupplier); + return writeHandle; } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java new file mode 100644 index 0000000000000..07a71969ffa5a --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieCreateHandle} that supports create write incrementally(mini-batches). + * + *

For the first mini-batch, it initialize and set up the next file path to write, + * but does not close the file writer until all the mini-batches write finish. Each mini-batch + * data are appended to the same file. + * + * @param Payload type + * @param Input type + * @param Key type + * @param Output type + */ +public class FlinkCreateHandle + extends HoodieCreateHandle implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkCreateHandle.class); + private long lastFileSize = 0L; + + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) { + this(config, instantTime, hoodieTable, partitionPath, fileId, getWriterSchemaIncludingAndExcludingMetadataPair(config), + taskContextSupplier); + } + + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Pair writerSchemaIncludingAndExcludingMetadataPair, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, writerSchemaIncludingAndExcludingMetadataPair, + taskContextSupplier); + } + + /** + * Called by the compactor code path. + */ + public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + String partitionPath, String fileId, Map> recordMap, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, partitionPath, fileId, recordMap, taskContextSupplier); + } + + /** + * Get the incremental write status. In mini-batch write mode, + * this handle would be reused for a checkpoint bucket(the bucket is appended as mini-batches), + * thus, after a mini-batch append finish, we do not close the underneath writer but return + * the incremental WriteStatus instead. + * + * @return the incremental write status + */ + private WriteStatus getIncrementalWriteStatus() { + try { + long fileSizeInBytes = FSUtils.getFileSize(fs, path); + setUpWriteStatus(fileSizeInBytes); + // reset the write status + recordsWritten = 0; + recordsDeleted = 0; + insertRecordsWritten = 0; + this.lastFileSize = fileSizeInBytes; + writeStatus.setTotalErrorRecords(0); + return writeStatus; + } catch (IOException e) { + throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); + } + } + + /** + * Set up the write status. + * + * @param fileSizeInBytes File size in bytes + * @throws IOException if error occurs + */ + private void setUpWriteStatus(long fileSizeInBytes) throws IOException { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(writeStatus.getPartitionPath()); + stat.setNumWrites(recordsWritten); + stat.setNumDeletes(recordsDeleted); + stat.setNumInserts(insertRecordsWritten); + stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); + stat.setFileId(writeStatus.getFileId()); + stat.setPath(new Path(config.getBasePath()), path); + stat.setTotalWriteBytes(fileSizeInBytes - lastFileSize); + stat.setFileSizeInBytes(fileSizeInBytes); + stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats(); + runtimeStats.setTotalCreateTime(timer.endTimer()); + stat.setRuntimeStats(runtimeStats); + timer = new HoodieTimer().startTimer(); + writeStatus.setStat(stat); + } + + public void finishWrite() { + LOG.info("Closing the file " + writeStatus.getFileId() + " as we are done with all the records " + recordsWritten); + try { + fileWriter.close(); + } catch (IOException e) { + throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e); + } + } + + /** + * Performs actions to durably, persist the current changes and returns a WriteStatus object. + */ + @Override + public List close() { + return Collections.singletonList(getIncrementalWriteStatus()); + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java new file mode 100644 index 0000000000000..cfd17295b0971 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.table.HoodieTable; + +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers). + * + *

For a new data buffer, it initialize and set up the next file path to write, + * and closes the file path when the data buffer write finish. When next data buffer + * write starts, it rolls over to another new file. If all the data buffers write finish + * for a checkpoint round, it renames the last new file path as the desired file name + * (name with the expected file ID). + * + * @param Payload type + * @param Input type + * @param Key type + * @param Output type + */ +public class FlinkMergeHandle + extends HoodieMergeHandle + implements MiniBatchHandle { + + private static final Logger LOG = LogManager.getLogger(FlinkMergeHandle.class); + + /** + * Records the current file handles number that rolls over. + */ + private int rollNumber = 0; + /** + * Records the rolled over file paths. + */ + private List rolloverPaths; + /** + * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet. + */ + private boolean needBootStrap = true; + + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Iterator> recordItr, String partitionPath, String fileId, + TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, taskContextSupplier); + rolloverPaths = new ArrayList<>(); + } + + /** + * Called by compactor code path. + */ + public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, + Map> keyToNewRecords, String partitionPath, String fileId, + HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { + super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, + dataFileToBeMerged, taskContextSupplier); + } + + /** + * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + */ + protected String generatesDataFileName() { + return FSUtils.makeDataFileName(instantTime, writeToken, fileId + "-" + rollNumber, hoodieTable.getBaseFileExtension()); + } + + public boolean isNeedBootStrap() { + return needBootStrap; + } + + @Override + public List close() { + List writeStatus = super.close(); + this.needBootStrap = false; + return writeStatus; + } + + /** + * The difference with the parent method is that there is no need to set up + * locations for the records. + * + * @param fileId The file ID + * @param newRecordsItr The incremental records iterator + */ + @Override + protected void init(String fileId, Iterator> newRecordsItr) { + initializeIncomingRecordsMap(); + while (newRecordsItr.hasNext()) { + HoodieRecord record = newRecordsItr.next(); + // NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist + keyToNewRecords.put(record.getRecordKey(), record); + } + LOG.info(String.format("Number of entries in MemoryBasedMap => %d\n" + + "Total size in bytes of MemoryBasedMap => %d\n" + + "Number of entries in DiskBasedMap => %d\n" + + "Size of file spilled to disk => %d", + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries(), + ((ExternalSpillableMap) keyToNewRecords).getCurrentInMemoryMapSize(), + ((ExternalSpillableMap) keyToNewRecords).getDiskBasedMapNumEntries(), + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes())); + } + + /** + * + * Rolls over the write handle to prepare for the next batch write. + * + *

It tweaks the handle state as following: + * + *

    + *
  • Increment the {@code rollNumber}
  • + *
  • Book-keep the last file path, these files (except the last one) are temporary that need to be cleaned
  • + *
  • Make the last new file path as old
  • + *
  • Initialize the new file path and file writer
  • + *
+ * + * @param newRecordsItr The records iterator to update + */ + public void rollOver(Iterator> newRecordsItr) { + init(this.fileId, newRecordsItr); + this.recordsWritten = 0; + this.recordsDeleted = 0; + this.updatedRecordsWritten = 0; + this.insertRecordsWritten = 0; + this.writeStatus.setTotalErrorRecords(0); + this.timer = new HoodieTimer().startTimer(); + + rollNumber++; + + rolloverPaths.add(newFilePath); + oldFilePath = newFilePath; + // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. + String newFileName = generatesDataFileName(); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + newFilePath = new Path(config.getBasePath(), relativePath); + + try { + fileWriter = createNewFileWriter(instantTime, newFilePath, hoodieTable, config, writerSchemaWithMetafields, taskContextSupplier); + } catch (IOException e) { + throw new HoodieIOException("Error when creating file writer for path " + newFilePath, e); + } + + LOG.info(String.format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(), + newFilePath.toString())); + } + + public void finishWrite() { + for (int i = 0; i < rolloverPaths.size() - 1; i++) { + Path path = rolloverPaths.get(i); + try { + fs.delete(path, false); + } catch (IOException e) { + throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); + } + } + Path lastPath = rolloverPaths.size() > 0 + ? rolloverPaths.get(rolloverPaths.size() - 1) + : newFilePath; + String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + + newFileName).toString(); + final Path desiredPath = new Path(config.getBasePath(), relativePath); + try { + fs.rename(lastPath, desiredPath); + } catch (IOException e) { + throw new HoodieIOException("Error when rename the temporary roll file: " + lastPath + " to: " + desiredPath, e); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java new file mode 100644 index 0000000000000..2cae807a9ce3b --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/MiniBatchHandle.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +/** + * Hoodie write handle that supports write as mini-batch. + */ +public interface MiniBatchHandle { + /** + * Finish the write of multiple mini-batches. Usually these mini-bathes + * come from a checkpoint interval. + */ + void finishWrite(); +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java new file mode 100644 index 0000000000000..c0699ff8e2813 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/ExplicitWriteHandleTable.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import java.util.List; + +/** + * HoodieTable that need to pass in the + * {@link org.apache.hudi.io.HoodieWriteHandle} explicitly. + */ +public interface ExplicitWriteHandleTable { + /** + * Upsert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> upsert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records); + + /** + * Insert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> insert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records); + + /** + * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be + * de-duped and non existent keys will be removed before deleting. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> delete( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List keys); + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> upsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords); + + /** + * Inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + HoodieWriteMetadata> insertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords); +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 4d7edd79d7baa..ddc3fbe277e79 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata; import org.apache.hudi.table.action.clean.FlinkCleanActionExecutor; @@ -61,14 +62,117 @@ protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineCont super(config, context, metaClient); } + /** + * Upsert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> upsert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records) { + return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute(); + } + + /** + * Insert a batch of new records into Hoodie table at the supplied instantTime. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param records hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> insert( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> records) { + return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute(); + } + + /** + * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be + * de-duped and non existent keys will be removed before deleting. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param writeHandle The write handle + * @param instantTime Instant Time for the action + * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> delete( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List keys) { + return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute(); + } + + /** + * Upserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> upsertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords) { + return new FlinkUpsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + + /** + * Inserts the given prepared records into the Hoodie table, at the supplied instantTime. + * + *

This implementation requires that the input records are already tagged, and de-duped if needed. + * + *

Specifies the write handle explicitly in order to have fine grained control with + * the underneath file. + * + * @param context HoodieEngineContext + * @param instantTime Instant Time for the action + * @param preppedRecords hoodieRecords to upsert + * @return HoodieWriteMetadata + */ + public HoodieWriteMetadata> insertPrepped( + HoodieEngineContext context, + HoodieWriteHandle writeHandle, + String instantTime, + List> preppedRecords) { + return new FlinkInsertPreppedCommitActionExecutor<>(context, writeHandle, config, this, instantTime, preppedRecords).execute(); + } + @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, List> records) { - return new FlinkUpsertCommitActionExecutor<>(context, config, this, instantTime, records).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override public HoodieWriteMetadata> insert(HoodieEngineContext context, String instantTime, List> records) { - return new FlinkInsertCommitActionExecutor<>(context, config, this, instantTime, records).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override @@ -81,7 +185,7 @@ public HoodieWriteMetadata> bulkInsert(HoodieEngineContext con @Override public HoodieWriteMetadata> delete(HoodieEngineContext context, String instantTime, List keys) { - return new FlinkDeleteCommitActionExecutor<>(context, config, this, instantTime, keys).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override @@ -91,12 +195,12 @@ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, List> preppedRecords) { - return new FlinkUpsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override public HoodieWriteMetadata> insertPrepped(HoodieEngineContext context, String instantTime, List> preppedRecords) { - return new FlinkInsertPreppedCommitActionExecutor<>(context, config, this, instantTime, preppedRecords).execute(); + throw new HoodieNotSupportedException("This method should not be invoked"); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index a8e6ed1e077ad..6b7c4a69aee73 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -36,7 +36,8 @@ import java.util.List; public abstract class HoodieFlinkTable - extends HoodieTable>, List, List> { + extends HoodieTable>, List, List> + implements ExplicitWriteHandleTable { protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 044f841d27615..9b6dcd6335c7c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -32,11 +32,14 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.execution.FlinkLazyInsertIterable; -import org.apache.hudi.io.FlinkCreateHandleFactory; +import org.apache.hudi.io.ExplicitCreateHandleFactory; +import org.apache.hudi.io.FlinkMergeHandle; +import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; -import org.apache.hudi.io.HoodieSortedMergeHandle; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -71,21 +74,26 @@ public abstract class BaseFlinkCommitActionExecutor writeHandle; + public BaseFlinkCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) { - super(context, config, table, instantTime, operationType, Option.empty()); + this(context, writeHandle, config, table, instantTime, operationType, Option.empty()); } public BaseFlinkCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option extraMetadata) { super(context, config, table, instantTime, operationType, extraMetadata); + this.writeHandle = writeHandle; } @Override @@ -182,6 +190,16 @@ protected Iterator> handleUpsertPartition( case INSERT: return handleInsert(fileIdHint, recordItr); case UPDATE: + if (this.writeHandle instanceof HoodieCreateHandle) { + // During one checkpoint interval, an insert record could also be updated, + // for example, for an operation sequence of a record: + // I, U, | U, U + // - batch1 - | - batch2 - + // the first batch(batch1) operation triggers an INSERT bucket, + // the second batch batch2 tries to reuse the same bucket + // and append instead of UPDATE. + return handleInsert(fileIdHint, recordItr); + } return handleUpdate(partitionPath, fileIdHint, recordItr); default: throw new HoodieUpsertException("Unknown bucketType " + bucketType + " for partition :" + partitionPath); @@ -203,7 +221,7 @@ public Iterator> handleUpdate(String partitionPath, String fil return Collections.singletonList((List) Collections.EMPTY_LIST).iterator(); } // these are updates - HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr); + HoodieMergeHandle upsertHandle = getUpdateHandle(recordItr); return handleUpdateInternal(upsertHandle, fileId); } @@ -225,11 +243,16 @@ protected Iterator> handleUpdateInternal(HoodieMergeHandle> recordItr) { + protected FlinkMergeHandle getUpdateHandle(Iterator> recordItr) { if (table.requireSortedRecords()) { - return new HoodieSortedMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); + throw new HoodieNotSupportedException("Sort records are not supported in Flink streaming write"); } else { - return new HoodieMergeHandle<>(config, instantTime, table, recordItr, partitionPath, fileId, taskContextSupplier); + FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle; + // add the incremental records. + if (!writeHandle.isNeedBootStrap()) { + writeHandle.rollOver(recordItr); + } + return writeHandle; } } @@ -242,6 +265,6 @@ public Iterator> handleInsert(String idPfx, Iterator) Collections.EMPTY_LIST).iterator(); } return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx, - taskContextSupplier, new FlinkCreateHandleFactory<>()); + taskContextSupplier, new ExplicitCreateHandleFactory<>(writeHandle)); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java index 4b46f7fe7d6fa..2064be3e67113 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -33,9 +34,12 @@ public class FlinkDeleteCommitActionExecutor> e private final List keys; public FlinkDeleteCommitActionExecutor(HoodieEngineContext context, - HoodieWriteConfig config, HoodieTable table, - String instantTime, List keys) { - super(context, config, table, instantTime, WriteOperationType.DELETE); + HoodieWriteHandle writeHandle, + HoodieWriteConfig config, + HoodieTable table, + String instantTime, + List keys) { + super(context, writeHandle, config, table, instantTime, WriteOperationType.DELETE); this.keys = keys; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java index e0c47db26e484..041598314f343 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,11 +35,12 @@ public class FlinkInsertCommitActionExecutor> e private List> inputRecords; public FlinkInsertCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> inputRecords) { - super(context, config, table, instantTime, WriteOperationType.INSERT); + super(context, writeHandle, config, table, instantTime, WriteOperationType.INSERT); this.inputRecords = inputRecords; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java index 8e535475a465b..459a6dbc8f672 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkInsertPreppedCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,9 +35,10 @@ public class FlinkInsertPreppedCommitActionExecutor> preppedRecords; public FlinkInsertPreppedCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> preppedRecords) { - super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED); + super(context, writeHandle, config, table, instantTime, WriteOperationType.INSERT_PREPPED); this.preppedRecords = preppedRecords; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java index d34aca22f049c..539f551c92585 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkMergeHelper.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; import org.apache.hudi.common.util.queue.IteratorBasedQueueProducer; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -64,13 +65,15 @@ public void runMerge(HoodieTable>, List, List HoodieMergeHandle>, List, List> upsertHandle) throws IOException { final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation(); Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf()); - HoodieMergeHandle>, List, List> mergeHandle = upsertHandle; + FlinkMergeHandle>, List, List> mergeHandle = + (FlinkMergeHandle>, List, List>) upsertHandle; HoodieBaseFile baseFile = mergeHandle.baseFileForMerge(); final GenericDatumWriter gWriter; final GenericDatumReader gReader; Schema readSchema; - if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) { + if (mergeHandle.isNeedBootStrap() + && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) { readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetafields()); @@ -84,7 +87,7 @@ public void runMerge(HoodieTable>, List, List HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; - if (baseFile.getBootstrapBaseFile().isPresent()) { + if (mergeHandle.isNeedBootStrap() && baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { readerIterator = reader.getRecordIterator(readSchema); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java index 7842e259f6473..5859bb585fcd9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,11 +35,12 @@ public class FlinkUpsertCommitActionExecutor> e private List> inputRecords; public FlinkUpsertCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> inputRecords) { - super(context, config, table, instantTime, WriteOperationType.UPSERT); + super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT); this.inputRecords = inputRecords; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java index a6ecd93199ec2..42d932a2bd7f9 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkUpsertPreppedCommitActionExecutor.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -34,9 +35,10 @@ public class FlinkUpsertPreppedCommitActionExecutor> preppedRecords; public FlinkUpsertPreppedCommitActionExecutor(HoodieEngineContext context, + HoodieWriteHandle writeHandle, HoodieWriteConfig config, HoodieTable table, String instantTime, List> preppedRecords) { - super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); + super(context, writeHandle, config, table, instantTime, WriteOperationType.UPSERT_PREPPED); this.preppedRecords = preppedRecords; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java index 655fd1aeff6f2..f163b02f6a4bd 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java @@ -159,6 +159,12 @@ private FlinkOptions() { .defaultValue(4) .withDescription("Parallelism of tasks that do actual write, default is 4"); + public static final ConfigOption WRITE_BATCH_SIZE = ConfigOptions + .key("write.batch.size.MB") + .doubleType() + .defaultValue(128D) // 128MB + .withDescription("Batch buffer size in MB to flush data into the underneath filesystem"); + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java index 4309bb008b5a3..d7c0256b7feea 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java @@ -34,6 +34,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -50,7 +51,9 @@ /** * A {@link KeyedProcessFunction} where the write operations really happens. */ -public class KeyedWriteProcessFunction extends KeyedProcessFunction, Integer>> implements CheckpointedFunction { +public class KeyedWriteProcessFunction + extends KeyedProcessFunction, Integer>> + implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(KeyedWriteProcessFunction.class); /** @@ -160,6 +163,11 @@ public void processElement(HoodieRecord hoodieRecord, Context context, Collector putDataIntoBuffer(hoodieRecord); } + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + this.writeClient.cleanHandles(); + } + public boolean hasRecordsIn() { return hasRecordsIn; } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java index 58770982418a8..ba6cea5614efa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteFunction.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ObjectSizeCalculator; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; import org.apache.hudi.table.action.commit.FlinkWriteHelper; @@ -33,6 +34,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -48,6 +50,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; @@ -58,33 +61,39 @@ *

Work Flow

* *

The function firstly buffers the data as a batch of {@link HoodieRecord}s, - * It flushes(write) the records batch when a Flink checkpoint starts. After a batch has been written successfully, + * It flushes(write) the records batch when a batch exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * or a Flink checkpoint starts. After a batch has been written successfully, * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. * - *

Exactly-once Semantics

+ *

The Semantics

* *

The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator * starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. - * The function process thread then block data buffering and the checkpoint thread starts flushing the existing data buffer. - * When the existing data buffer write successfully, the process thread unblock and start buffering again for the next round checkpoint. - * Because any checkpoint failures would trigger the write rollback, it implements the exactly-once semantics. + * + *

In order to improve the throughput, The function process thread does not block data buffering + * after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint + * batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records + * (e.g. the eager write batch), the semantics is still correct using the UPSERT operation. * *

Fault Tolerance

* - *

The operator coordinator checks the validity for the last instant when it starts a new one. The operator rolls back - * the written data and throws when any error occurs. This means any checkpoint or task failure would trigger a failover. - * The operator coordinator would try several times when committing the writestatus. + *

The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully. + * The operator rolls back the written data and throws to trigger a failover when any error occurs. + * This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped). + * If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last + * step of the #flushBuffer method). + * + *

The operator coordinator would try several times when committing the write status. * - *

Note: The function task requires the input stream be partitioned by the partition fields to avoid different write tasks - * write to the same file group that conflict. The general case for partition path is a datetime field, - * so the sink task is very possible to have IO bottleneck, the more flexible solution is to shuffle the - * data by the file group IDs. + *

Note: The function task requires the input stream be shuffled by the file IDs. * * @param Type of the input record * @see StreamWriteOperatorCoordinator */ -public class StreamWriteFunction extends KeyedProcessFunction implements CheckpointedFunction { +public class StreamWriteFunction + extends KeyedProcessFunction + implements CheckpointedFunction, CheckpointListener { private static final long serialVersionUID = 1L; @@ -137,10 +146,15 @@ public class StreamWriteFunction extends KeyedProcessFunction */ private transient OperatorEventGateway eventGateway; + /** + * The detector that tells if to flush the data as mini-batch. + */ + private transient BufferSizeDetector detector; + /** * Constructs a StreamingSinkFunction. * - * @param config The config options + * @param config The config options */ public StreamWriteFunction(Configuration config) { this.config = config; @@ -149,6 +163,7 @@ public StreamWriteFunction(Configuration config) { @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.detector = new BufferSizeDetector(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE)); initBuffer(); initWriteClient(); initWriteFunction(); @@ -166,11 +181,8 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { // Based on the fact that the coordinator starts the checkpoint first, // it would check the validity. this.onCheckpointing = true; - this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); - Preconditions.checkNotNull(this.currentInstant, - "No inflight instant when flushing data"); // wait for the buffer data flush out and request a new instant - flushBuffer(); + flushBuffer(true); // signal the task thread to start buffering addToBufferCondition.signal(); } finally { @@ -186,6 +198,7 @@ public void processElement(I value, KeyedProcessFunction.Context ctx, C if (onCheckpointing) { addToBufferCondition.await(); } + flushBufferOnCondition(value); putDataIntoBuffer(value); } finally { bufferLock.unlock(); @@ -199,6 +212,11 @@ public void close() { } } + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.writeClient.cleanHandles(); + } + // ------------------------------------------------------------------------- // Getter/Setter // ------------------------------------------------------------------------- @@ -252,6 +270,42 @@ private void initWriteFunction() { } } + /** + * Tool to detect if to flush out the existing buffer. + * Sampling the record to compute the size with 0.01 percentage. + */ + private static class BufferSizeDetector { + private final Random random = new Random(47); + private static final int DENOMINATOR = 100; + + private final double batchSizeBytes; + + private long lastRecordSize = -1L; + private long totalSize = 0L; + + BufferSizeDetector(double batchSizeMb) { + this.batchSizeBytes = batchSizeMb * 1024 * 1024; + } + + boolean detect(Object record) { + if (lastRecordSize == -1 || sampling()) { + lastRecordSize = ObjectSizeCalculator.getObjectSize(record); + } + totalSize += lastRecordSize; + return totalSize > this.batchSizeBytes; + } + + boolean sampling() { + // 0.01 sampling percentage + return random.nextInt(DENOMINATOR) == 1; + } + + void reset() { + this.lastRecordSize = -1L; + this.totalSize = 0L; + } + } + private void putDataIntoBuffer(I value) { HoodieRecord record = (HoodieRecord) value; final String fileId = record.getCurrentLocation().getFileId(); @@ -262,8 +316,25 @@ private void putDataIntoBuffer(I value) { this.buffer.get(key).add(record); } + /** + * Flush the data buffer if the buffer size is greater than + * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. + * + * @param value HoodieRecord + */ + private void flushBufferOnCondition(I value) { + boolean needFlush = this.detector.detect(value); + if (needFlush) { + flushBuffer(false); + this.detector.reset(); + } + } + @SuppressWarnings("unchecked, rawtypes") - private void flushBuffer() { + private void flushBuffer(boolean isFinalBatch) { + this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE)); + Preconditions.checkNotNull(this.currentInstant, + "No inflight instant when flushing data"); final List writeStatus; if (buffer.size() > 0) { writeStatus = new ArrayList<>(); @@ -278,12 +349,13 @@ private void flushBuffer() { writeStatus.addAll(writeFunction.apply(records, currentInstant)); } }); - this.buffer.clear(); } else { LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); writeStatus = Collections.emptyList(); } - this.eventGateway.sendEventToCoordinator(new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus)); + this.eventGateway.sendEventToCoordinator( + new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch)); + this.buffer.clear(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java index bf0cfc27e91e5..bd933c212f0fa 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java @@ -80,12 +80,15 @@ public class StreamWriteOperatorCoordinator */ private transient HoodieFlinkWriteClient writeClient; + /** + * Current data buffering checkpoint. + */ private long inFlightCheckpoint = -1; /** * Current REQUESTED instant, for validation. */ - private String inFlightInstant = ""; + private String instant = ""; /** * Event buffer for one round of checkpointing. When all the elements are non-null and have the same @@ -119,6 +122,8 @@ public void start() throws Exception { initWriteClient(); // init table, create it if not exists. initTableIfNotExists(this.conf); + // start a new instant + startInstant(); } @Override @@ -132,20 +137,14 @@ public void close() { @Override public void checkpointCoordinator(long checkpointId, CompletableFuture result) { try { - final String errMsg = "A new checkpoint starts while the last checkpoint buffer" - + " data has not finish writing, roll back the last write and throw"; - checkAndForceCommit(errMsg); - this.inFlightInstant = this.writeClient.startCommit(); - this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); this.inFlightCheckpoint = checkpointId; - LOG.info("Create instant [{}], at checkpoint [{}]", this.inFlightInstant, checkpointId); result.complete(writeCheckpointBytes()); } catch (Throwable throwable) { // when a checkpoint fails, throws directly. result.completeExceptionally( new CompletionException( String.format("Failed to checkpoint Instant %s for source %s", - this.inFlightInstant, this.getClass().getSimpleName()), throwable)); + this.instant, this.getClass().getSimpleName()), throwable)); } } @@ -153,6 +152,15 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r public void checkpointComplete(long checkpointId) { // start to commit the instant. checkAndCommitWithRetry(); + // start new instant. + startInstant(); + } + + private void startInstant() { + this.instant = this.writeClient.startCommit(); + this.writeClient.transitionRequestedToInflight(conf.getString(FlinkOptions.TABLE_TYPE), this.instant); + LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, + this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); } public void notifyCheckpointAborted(long checkpointId) { @@ -175,10 +183,14 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent, "The coordinator can only handle BatchWriteSuccessEvent"); BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent; - Preconditions.checkState(event.getInstantTime().equals(this.inFlightInstant), + Preconditions.checkState(event.getInstantTime().equals(this.instant), String.format("Receive an unexpected event for instant %s from task %d", event.getInstantTime(), event.getTaskID())); - this.eventBuffer[event.getTaskID()] = event; + if (this.eventBuffer[event.getTaskID()] != null) { + this.eventBuffer[event.getTaskID()].mergeWith(event); + } else { + this.eventBuffer[event.getTaskID()] = event; + } } @Override @@ -218,7 +230,7 @@ private byte[] writeCheckpointBytes() throws IOException { DataOutputStream out = new DataOutputViewStreamWrapper(baos)) { out.writeLong(this.inFlightCheckpoint); - byte[] serializedInstant = this.inFlightInstant.getBytes(); + byte[] serializedInstant = this.instant.getBytes(); out.writeInt(serializedInstant.length); out.write(serializedInstant); out.flush(); @@ -239,12 +251,12 @@ private void deserializeCheckpointAndRestore(byte[] bytes) throws Exception { int serializedInstantSize = in.readInt(); byte[] serializedInstant = readBytes(in, serializedInstantSize); this.inFlightCheckpoint = checkpointID; - this.inFlightInstant = new String(serializedInstant); + this.instant = new String(serializedInstant); } } private void reset() { - this.inFlightInstant = ""; + this.instant = ""; this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism]; } @@ -253,8 +265,8 @@ private void checkAndForceCommit(String errMsg) { // forced but still has inflight instant String inflightInstant = writeClient.getInflightAndRequestedInstant(this.conf.getString(FlinkOptions.TABLE_TYPE)); if (inflightInstant != null) { - assert inflightInstant.equals(this.inFlightInstant); - writeClient.rollback(this.inFlightInstant); + assert inflightInstant.equals(this.instant); + writeClient.rollback(this.instant); throw new HoodieException(errMsg); } if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) { @@ -277,6 +289,10 @@ private void checkAndCommitWithRetry() { if (!checkReady()) { // Do not throw if the try times expires but the event buffer are still not ready, // because we have a force check when next checkpoint starts. + if (tryTimes == retryTimes) { + // Throw if the try times expires but the event buffer are still not ready + throw new HoodieException("Try " + retryTimes + " to commit instant [" + this.instant + "] failed"); + } sleepFor(retryIntervalMillis); continue; } @@ -284,9 +300,9 @@ private void checkAndCommitWithRetry() { return; } catch (Throwable throwable) { String cause = throwable.getCause() == null ? "" : throwable.getCause().toString(); - LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.inFlightInstant, tryTimes, cause); + LOG.warn("Try to commit the instant {} failed, with times {} and cause {}", this.instant, tryTimes, cause); if (tryTimes == retryTimes) { - throw new HoodieException(throwable); + throw new HoodieException("Not all write tasks finish the batch write to commit", throwable); } sleepFor(retryIntervalMillis); } @@ -307,8 +323,8 @@ private void sleepFor(long intervalMillis) { /** Checks the buffer is ready to commit. */ private boolean checkReady() { - return Arrays.stream(eventBuffer).allMatch(event -> - event != null && event.getInstantTime().equals(this.inFlightInstant)); + return Arrays.stream(eventBuffer) + .allMatch(event -> event != null && event.isReady(this.instant)); } /** Performs the actual commit action. */ @@ -320,7 +336,7 @@ private void doCommit() { if (writeResults.size() == 0) { // No data has written, clear the metadata file - this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.inFlightInstant); + this.writeClient.deletePendingInstant(this.conf.getString(FlinkOptions.TABLE_TYPE), this.instant); reset(); return; } @@ -337,12 +353,12 @@ private void doCommit() { + totalErrorRecords + "/" + totalRecords); } - boolean success = writeClient.commit(this.inFlightInstant, writeResults, Option.of(checkpointCommitMetadata)); + boolean success = writeClient.commit(this.instant, writeResults, Option.of(checkpointCommitMetadata)); if (success) { reset(); - LOG.info("Commit instant [{}] success!", this.inFlightInstant); + LOG.info("Commit instant [{}] success!", this.instant); } else { - throw new HoodieException(String.format("Commit instant [%s] failed!", this.inFlightInstant)); + throw new HoodieException(String.format("Commit instant [%s] failed!", this.instant)); } } else { LOG.error("Error when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); @@ -355,8 +371,8 @@ private void doCommit() { } }); // Rolls back instant - writeClient.rollback(this.inFlightInstant); - throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.inFlightInstant)); + writeClient.rollback(this.instant); + throw new HoodieException(String.format("Commit instant [%s] failed and rolled back !", this.instant)); } } @@ -366,8 +382,14 @@ public BatchWriteSuccessEvent[] getEventBuffer() { } @VisibleForTesting - public String getInFlightInstant() { - return inFlightInstant; + public String getInstant() { + return instant; + } + + @VisibleForTesting + @SuppressWarnings("rawtypes") + public HoodieFlinkWriteClient getWriteClient() { + return writeClient; } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java index db5432e1e9244..f03e8d3e27f00 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/event/BatchWriteSuccessEvent.java @@ -21,7 +21,9 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.util.ValidationUtils; +import java.util.ArrayList; import java.util.List; /** @@ -30,17 +32,38 @@ public class BatchWriteSuccessEvent implements OperatorEvent { private static final long serialVersionUID = 1L; - private final List writeStatuses; + private List writeStatuses; private final int taskID; private final String instantTime; + private boolean isLastBatch; public BatchWriteSuccessEvent( int taskID, String instantTime, List writeStatuses) { + this(taskID, instantTime, writeStatuses, false); + } + + /** + * Creates an event. + * + * @param taskID The task ID + * @param instantTime The instant time under which to write the data + * @param writeStatuses The write statues list + * @param isLastBatch Whether the event reports the last batch + * within an checkpoint interval, + * if true, the whole data set of the checkpoint + * has been flushed successfully + */ + public BatchWriteSuccessEvent( + int taskID, + String instantTime, + List writeStatuses, + boolean isLastBatch) { this.taskID = taskID; this.instantTime = instantTime; - this.writeStatuses = writeStatuses; + this.writeStatuses = new ArrayList<>(writeStatuses); + this.isLastBatch = isLastBatch; } public List getWriteStatuses() { @@ -54,4 +77,28 @@ public int getTaskID() { public String getInstantTime() { return instantTime; } + + public boolean isLastBatch() { + return isLastBatch; + } + + /** + * Merges this event with given {@link BatchWriteSuccessEvent} {@code other}. + * + * @param other The event to be merged + */ + public void mergeWith(BatchWriteSuccessEvent other) { + ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime)); + ValidationUtils.checkArgument(this.taskID == other.taskID); + this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true. + List statusList = new ArrayList<>(); + statusList.addAll(this.writeStatuses); + statusList.addAll(other.writeStatuses); + this.writeStatuses = statusList; + } + + /** Returns whether the event is ready to commit. */ + public boolean isReady(String currentInstant) { + return isLastBatch && this.instantTime.equals(currentInstant); + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java index 269ccc801592a..590ee3be84356 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/partitioner/BucketAssignFunction.java @@ -89,7 +89,7 @@ public void open(Configuration parameters) throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) { - this.bucketAssigner.reset(); + // no operation } @Override @@ -144,6 +144,7 @@ public void processElement(I value, Context ctx, Collector out) throws Except @Override public void notifyCheckpointComplete(long l) { // Refresh the table state when there are new commits. + this.bucketAssigner.reset(); this.bucketAssigner.refreshTable(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4447705a9012f..1c402849331f7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; @@ -53,6 +54,7 @@ import java.io.IOException; import java.io.StringReader; import java.util.List; +import java.util.Objects; import java.util.Properties; /** @@ -293,4 +295,9 @@ public static void initTableIfNotExists(Configuration conf) throws IOException { public static String generateBucketKey(String partitionPath, String fileId) { return String.format("%s_%s", partitionPath, fileId); } + + /** Returns whether the location represents an insert. */ + public static boolean isInsert(HoodieRecordLocation loc) { + return Objects.equals(loc.getInstantTime(), "I"); + } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java index fea9b8f92feb0..12a00dd8eaf1b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteFunctionTest.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; @@ -150,7 +151,8 @@ public void testCheckpoint() throws Exception { assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); funcWrapper.checkpointComplete(2); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, null); + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); } @@ -187,12 +189,13 @@ public void testCheckpointFails() throws Exception { funcWrapper.invoke(rowData); } - // this triggers the data write and event send - funcWrapper.checkpointFunction(2); - // Do not sent the write event and fails the checkpoint - assertThrows(HoodieException.class, - () -> funcWrapper.checkpointFails(2), - "The last checkpoint was aborted, roll back the last write and throw"); + // this triggers NPE cause there is no inflight instant + assertThrows(NullPointerException.class, + () -> funcWrapper.checkpointFunction(2), + "No inflight instant when flushing data"); + // do not sent the write event and fails the checkpoint, + // behaves like the last checkpoint is successful. + funcWrapper.checkpointFails(2); } @Test @@ -212,13 +215,13 @@ public void testInsert() throws Exception { final OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED1); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.REQUESTED, instant); funcWrapper.checkpointComplete(1); + checkWrittenData(tempFile, EXPECTED1); // the coordinator checkpoint commits the inflight instant. checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); checkWrittenData(tempFile, EXPECTED1); @@ -241,15 +244,16 @@ public void testInsertDuplicates() throws Exception { // this triggers the data write and event send funcWrapper.checkpointFunction(1); - final OperatorEvent nextEvent = funcWrapper.getNextEvent(); + OperatorEvent nextEvent = funcWrapper.getNextEvent(); assertThat("The operator expect to send an event", nextEvent, instanceOf(BatchWriteSuccessEvent.class)); - checkWrittenData(tempFile, EXPECTED3, 1); funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); funcWrapper.checkpointComplete(1); + checkWrittenData(tempFile, EXPECTED3, 1); + // insert duplicates again for (RowData rowData : TestData.DATA_SET_THREE) { funcWrapper.invoke(rowData); @@ -257,6 +261,10 @@ public void testInsertDuplicates() throws Exception { funcWrapper.checkpointFunction(2); + nextEvent = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent); + funcWrapper.checkpointComplete(2); + checkWrittenData(tempFile, EXPECTED3, 1); } @@ -306,10 +314,84 @@ public void testUpsert() throws Exception { checkWrittenData(tempFile, EXPECTED2); } + @Test + public void testInsertWithMiniBatches() throws Exception { + // reset the config option + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.001); // 1Kb batch size + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // Each record is 424 bytes. so 3 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("Should have 1 data bucket", dataBuffer.size(), is(1)); + assertThat("2 records expect to flush out as a mini-batch", + dataBuffer.values().stream().findFirst().map(List::size).orElse(-1), + is(3)); + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(BatchWriteSuccessEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + + funcWrapper.checkpointComplete(1); + + Map expected = new HashMap<>(); + expected.put("par1", "[id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1, " + + "id1,par1,id1,Danny,23,1,par1]"); + checkWrittenData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_THREE) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + final OperatorEvent event5 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.getCoordinator().handleEventFromOperator(0, event5); + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + checkWrittenData(tempFile, expected, 1); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- + @SuppressWarnings("rawtypes") + private void checkInflightInstant(HoodieFlinkWriteClient writeClient) { + final String instant = writeClient.getInflightAndRequestedInstant("COPY_ON_WRITE"); + assertNotNull(instant); + } + @SuppressWarnings("rawtypes") private void checkInstantState( HoodieFlinkWriteClient writeClient, diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java index c533b48e5a296..732cc65be0582 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteOperatorCoordinatorTest.java @@ -18,8 +18,11 @@ package org.apache.hudi.operator; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.operator.event.BatchWriteSuccessEvent; import org.apache.hudi.operator.utils.TestConfigurations; import org.apache.hudi.util.StreamerUtil; @@ -37,6 +40,9 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -61,6 +67,34 @@ public void after() { coordinator.close(); } + @Test + void testInstantState() { + String instant = coordinator.getInstant(); + assertNotEquals("", instant); + + WriteStatus writeStatus = new WriteStatus(true, 0.1D); + writeStatus.setPartitionPath("par1"); + writeStatus.setStat(new HoodieWriteStat()); + OperatorEvent event0 = + new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true); + + WriteStatus writeStatus1 = new WriteStatus(false, 0.2D); + writeStatus1.setPartitionPath("par2"); + writeStatus1.setStat(new HoodieWriteStat()); + OperatorEvent event1 = + new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true); + coordinator.handleEventFromOperator(0, event0); + coordinator.handleEventFromOperator(1, event1); + + coordinator.checkpointComplete(1); + String inflight = coordinator.getWriteClient() + .getInflightAndRequestedInstant("COPY_ON_WRITE"); + String lastCompleted = coordinator.getWriteClient().getLastCompletedInstant("COPY_ON_WRITE"); + assertThat("Instant should be complete", lastCompleted, is(instant)); + assertNotEquals("", inflight, "Should start a new instant"); + assertNotEquals(instant, inflight, "Should start a new instant"); + } + @Test public void testTableInitialized() throws IOException { final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); @@ -88,14 +122,14 @@ public void testReceiveInvalidEvent() { } @Test - public void testCheckpointInvalid() { + public void testCheckpointCompleteWithRetry() { final CompletableFuture future = new CompletableFuture<>(); coordinator.checkpointCoordinator(1, future); - String inflightInstant = coordinator.getInFlightInstant(); + String inflightInstant = coordinator.getInstant(); OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList()); coordinator.handleEventFromOperator(0, event); - final CompletableFuture future2 = new CompletableFuture<>(); - coordinator.checkpointCoordinator(2, future2); - assertTrue(future2.isCompletedExceptionally()); + assertThrows(HoodieException.class, + () -> coordinator.checkpointComplete(1), + "Try 3 to commit instant"); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java index 59de2832664c0..6f7062c6fc019 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/operator/utils/StreamWriteFunctionWrapper.java @@ -38,6 +38,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -77,11 +79,11 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E this.conf = conf; // one function this.coordinator = new StreamWriteOperatorCoordinator(conf, 1); - this.coordinator.start(); this.functionInitializationContext = new MockFunctionInitializationContext(); } public void openFunction() throws Exception { + this.coordinator.start(); toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); @@ -123,6 +125,10 @@ public OperatorEvent getNextEvent() { return this.gateway.getNextEvent(); } + public Map> getDataBuffer() { + return this.writeFunction.getBuffer(); + } + @SuppressWarnings("rawtypes") public HoodieFlinkWriteClient getWriteClient() { return this.writeFunction.getWriteClient(); @@ -141,6 +147,7 @@ public void checkpointComplete(long checkpointId) { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.checkpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); + this.writeFunction.notifyCheckpointComplete(checkpointId); } public void checkpointFails(long checkpointId) {