diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java index 70ee473d245dd..e1fd1618b77af 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java @@ -19,5 +19,5 @@ package org.apache.hudi.table.action.commit; public enum BucketType { - UPDATE, INSERT + UPDATE, INSERT, APPEND_ONLY } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 05e4481ecfbe7..71ca1b6e8056b 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -56,6 +56,7 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; @@ -408,6 +409,12 @@ public void cleanHandlesGracefully() { final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); final String partitionPath = record.getPartitionPath(); + // append only mode always use FlinkCreateHandle + if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) { + return new FlinkCreateHandle<>(config, instantTime, table, partitionPath, + fileID, table.getTaskContextSupplier()); + } + if (bucketToHandles.containsKey(fileID)) { MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); if (lastHandle.shouldReplace()) { @@ -424,7 +431,8 @@ public void cleanHandlesGracefully() { if (isDelta) { writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, table.getTaskContextSupplier()); - } else if (loc.getInstantTime().equals("I")) { + } else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) { + // use the same handle for insert bucket writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()); } else { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 987f3350d5711..41d06667f9574 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; +import org.apache.hudi.table.action.commit.BucketType; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -80,7 +81,7 @@ protected boolean needsUpdateLocation() { @Override protected boolean isUpdateRecord(HoodieRecord hoodieRecord) { return hoodieRecord.getCurrentLocation() != null - && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); + && hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name()); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 3ff579fed4112..49d8918b9f191 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -30,6 +30,7 @@ import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; +import org.apache.hudi.table.MarkerFiles; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -70,6 +71,17 @@ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTab } } + @Override + protected void createMarkerFile(String partitionPath, String dataFileName) { + // In some rare cases, the task was pulled up again with same write file name, + // for e.g, reuse the small log files from last commit instant. + + // Just skip the marker file creation if it already exists, the new data would append to + // the file directly. + MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); + markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); + } + /** * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) * (thus the fs view got the written data files some of which may be invalid), diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 5cfd28be2c1dc..7dcc2406b66d6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -102,9 +102,7 @@ public HoodieWriteMetadata> execute(List> inpu final HoodieRecord record = inputRecords.get(0); final String partitionPath = record.getPartitionPath(); final String fileId = record.getCurrentLocation().getFileId(); - final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") - ? BucketType.INSERT - : BucketType.UPDATE; + final BucketType bucketType = BucketType.valueOf(record.getCurrentLocation().getInstantTime()); handleUpsertPartition( instantTime, partitionPath, @@ -185,6 +183,7 @@ protected Iterator> handleUpsertPartition( } else { switch (bucketType) { case INSERT: + case APPEND_ONLY: return handleInsert(fileIdHint, recordItr); case UPDATE: return handleUpdate(partitionPath, fileIdHint, recordItr); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index cd309d12b680e..1af4459c3f3f3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -186,6 +186,12 @@ private FlinkOptions() { .defaultValue(TABLE_TYPE_COPY_ON_WRITE) .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); + public static final ConfigOption APPEND_ONLY_ENABLE = ConfigOptions + .key("append_only.enable") + .booleanType() + .defaultValue(false) + .withDescription("Whether to write data to new baseFile without index, only support in COW, default false"); + public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 75a3454300073..5cc239bca7f35 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; +import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -55,9 +57,9 @@ * it then assigns the bucket with ID using the {@link BucketAssigner}. * *

All the records are tagged with HoodieRecordLocation, instead of real instant time, - * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep + * INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant time. There is no need to keep * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides - * where the record should write to. The "I" and "U" tags are only used for downstream to decide whether + * where the record should write to. The "INSERT" and "UPDATE" tags are only used for downstream to decide whether * the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer * supports specifying the bucket type explicitly. * @@ -106,11 +108,18 @@ public class BucketAssignFunction> */ private final boolean globalIndex; + private final boolean appendOnly; + public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); + this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE); + if (appendOnly) { + ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()), + "APPEND_ONLY mode only support in COPY_ON_WRITE table"); + } } @Override @@ -170,25 +179,33 @@ private void processRecord(HoodieRecord record, Collector out) throws Exce final String partitionPath = hoodieKey.getPartitionPath(); final HoodieRecordLocation location; + if (appendOnly) { + location = getNewRecordLocation(partitionPath); + this.context.setCurrentKey(recordKey); + record.setCurrentLocation(location); + out.collect((O) record); + return; + } + // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. HoodieRecordGlobalLocation oldLoc = indexState.value(); if (isChangingRecords && oldLoc != null) { - // Set up the instant time as "U" to mark the bucket as an update bucket. + // Set up the instant time as "UPDATE" to mark the bucket as an update bucket. if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) { if (globalIndex) { // if partition path changes, emit a delete record for old partition path, // then update the index state using location with new partition path. HoodieRecord deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()), payloadCreation.createDeletePayload((BaseAvroPayload) record.getData())); - deleteRecord.setCurrentLocation(oldLoc.toLocal("U")); + deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name())); deleteRecord.seal(); out.collect((O) deleteRecord); } location = getNewRecordLocation(partitionPath); updateIndexState(partitionPath, location); } else { - location = oldLoc.toLocal("U"); + location = oldLoc.toLocal(BucketType.UPDATE.name()); this.bucketAssigner.addUpdate(partitionPath, location.getFileId()); } } else { @@ -203,17 +220,26 @@ private void processRecord(HoodieRecord record, Collector out) throws Exce } private HoodieRecordLocation getNewRecordLocation(String partitionPath) { - final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath); + BucketInfo bucketInfo; + if (appendOnly) { + bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath); + } else { + bucketInfo = this.bucketAssigner.addInsert(partitionPath); + } + final HoodieRecordLocation location; switch (bucketInfo.getBucketType()) { case INSERT: - // This is an insert bucket, use HoodieRecordLocation instant time as "I". + // This is an insert bucket, use HoodieRecordLocation instant time as "INSERT". // Downstream operators can then check the instant time to know whether // a record belongs to an insert bucket. - location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); + location = new HoodieRecordLocation(BucketType.INSERT.name(), bucketInfo.getFileIdPrefix()); break; case UPDATE: - location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); + location = new HoodieRecordLocation(BucketType.UPDATE.name(), bucketInfo.getFileIdPrefix()); + break; + case APPEND_ONLY: + location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), bucketInfo.getFileIdPrefix()); break; default: throw new AssertionError(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 6d805ce8d83ec..965b5575193d6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -140,6 +140,14 @@ public BucketInfo addInsert(String partitionPath) { } // if we have anything more, create new insert buckets, like normal + return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT); + } + + public BucketInfo addAppendOnly(String partitionPath) { + return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY); + } + + private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType bucketType) { if (newFileAssignStates.containsKey(partitionPath)) { NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath); if (newFileAssignState.canAssign()) { @@ -148,7 +156,7 @@ public BucketInfo addInsert(String partitionPath) { final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); return bucketInfoMap.get(key); } - BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); + BucketInfo bucketInfo = new BucketInfo(bucketType, FSUtils.createNewFileIdPfx(), partitionPath); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); bucketInfoMap.put(key, bucketInfo); newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 0b4533f7c606e..c024d7c5cc178 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -69,6 +69,9 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) public String tableType; + @Parameter(names = {"--append-only"}, description = "Write data to new parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = true) + public Boolean appendOnly = false; + @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" @@ -290,7 +293,13 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); - conf.setString(FlinkOptions.OPERATION, config.operation.value()); + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly); + if (config.appendOnly) { + // append only should use insert operation + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + } else { + conf.setString(FlinkOptions.OPERATION, config.operation.value()); + } conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2eeb8f58b82a2..2c2193c78d403 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; @@ -144,6 +145,11 @@ private static void setupConfOptions( TableSchema schema) { // table name conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); + // append only + if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) { + // append only should use insert operation + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + } // hoodie key about options setupHoodieKeyOptions(conf, table); // cleaning options diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index b3338a9083cb9..5f6f62c63b065 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; @@ -44,6 +45,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -509,6 +511,83 @@ public void testInsertWithDeduplication() throws Exception { checkWrittenData(tempFile, expected, 1); } + @Test + public void testAppendOnly() throws Exception { + // reset the config option + conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size + conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false); + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + + // open the function and ingest data + funcWrapper.openFunction(); + // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + // this triggers the data write and event send + funcWrapper.checkpointFunction(1); + Map> dataBuffer = funcWrapper.getDataBuffer(); + assertThat("All data should be flushed out", dataBuffer.size(), is(0)); + + final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event2 = funcWrapper.getNextEvent(); + assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); + + funcWrapper.getCoordinator().handleEventFromOperator(0, event1); + funcWrapper.getCoordinator().handleEventFromOperator(0, event2); + assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); + + String instant = funcWrapper.getWriteClient() + .getLastPendingInstant(getTableType()); + + funcWrapper.checkpointComplete(1); + + Map> expected = new HashMap<>(); + + expected.put("par1", Arrays.asList( + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,4,par1")); + + TestData.checkWrittenAllData(tempFile, expected, 1); + + // started a new instant already + checkInflightInstant(funcWrapper.getWriteClient()); + checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); + + // insert duplicates again + for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { + funcWrapper.invoke(rowData); + } + + funcWrapper.checkpointFunction(2); + + final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first + final OperatorEvent event4 = funcWrapper.getNextEvent(); + funcWrapper.getCoordinator().handleEventFromOperator(0, event3); + funcWrapper.getCoordinator().handleEventFromOperator(0, event4); + funcWrapper.checkpointComplete(2); + + // Same the original base file content. + expected.put("par1", Arrays.asList( + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,0,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,1,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,2,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,3,par1", + "id1,par1,id1,Danny,23,4,par1", + "id1,par1,id1,Danny,23,4,par1")); + TestData.checkWrittenAllData(tempFile, expected, 1); + } + @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index 07e23b56edc92..b983e8c0ebbf6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -23,12 +23,14 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; @@ -37,6 +39,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.File; import java.util.Comparator; @@ -44,6 +47,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.junit.jupiter.api.Assertions.assertThrows; + /** * Test cases for delta stream write. */ @@ -86,6 +91,16 @@ protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; } + @Test + public void testAppendOnly() throws Exception { + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + assertThrows(IllegalArgumentException.class, () -> { + funcWrapper.openFunction(); + }, "APPEND_ONLY mode only support in COPY_ON_WRITE table"); + } + protected Map getMiniBatchExpected() { Map expected = new HashMap<>(); // MOR mode merges the messages with the same key. diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 13a71ecb80245..10e7d3d4b8b3b 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -19,15 +19,18 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.flink.configuration.Configuration; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertThrows; + /** * Test cases for delta stream write with compaction. */ @@ -39,10 +42,14 @@ protected void setUp(Configuration conf) { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); } - @Disabled @Test - public void testIndexStateBootstrap() { - // Ignore the index bootstrap because we only support parquet load now. + public void testAppendOnly() throws Exception { + conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); + conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); + funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); + assertThrows(IllegalArgumentException.class, () -> { + funcWrapper.openFunction(); + }, "APPEND_ONLY mode only support in COPY_ON_WRITE table"); } protected Map getMiniBatchExpected() { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 50ecf543e70ec..5a00da8cfe805 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -71,7 +71,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -/** Data set for testing, also some utilities to check the results. */ +/** + * Data set for testing, also some utilities to check the results. + */ public class TestData { public static List DATA_SET_INSERT = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -126,6 +128,7 @@ public class TestData { ); public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>(); + static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -133,6 +136,7 @@ public class TestData { } public static List DATA_SET_INSERT_SAME_KEY = new ArrayList<>(); + static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -236,8 +240,8 @@ public static String rowDataToString(List rows) { /** * Write a list of row data with Hoodie format base on the given configuration. * - * @param dataBuffer The data buffer to write - * @param conf The flink configuration + * @param dataBuffer The data buffer to write + * @param conf The flink configuration * @throws Exception if error occurs */ public static void writeData( @@ -281,8 +285,8 @@ public static void assertRowsEquals(List rows, String expected) { * Sort the {@code rows} using field at index {@code orderingPos} and asserts * it equals with the expected string {@code expected}. * - * @param rows Actual result rows - * @param expected Expected string of the sorted rows + * @param rows Actual result rows + * @param expected Expected string of the sorted rows * @param orderingPos Field position for ordering */ public static void assertRowsEquals(List rows, String expected, int orderingPos) { @@ -360,8 +364,10 @@ public static void checkWrittenData( assert baseFile.isDirectory(); FileFilter filter = file -> !file.getName().startsWith("."); File[] partitionDirs = baseFile.listFiles(filter); + assertNotNull(partitionDirs); assertThat(partitionDirs.length, is(partitions)); + for (File partitionDir : partitionDirs) { File[] dataFiles = partitionDir.listFiles(filter); assertNotNull(dataFiles); @@ -381,13 +387,44 @@ public static void checkWrittenData( } } + public static void checkWrittenAllData( + File baseFile, + Map> expected, + int partitions) throws IOException { + assert baseFile.isDirectory(); + FileFilter filter = file -> !file.getName().startsWith("."); + File[] partitionDirs = baseFile.listFiles(filter); + + assertNotNull(partitionDirs); + assertThat(partitionDirs.length, is(partitions)); + + for (File partitionDir : partitionDirs) { + File[] dataFiles = partitionDir.listFiles(filter); + assertNotNull(dataFiles); + + List readBuffer = new ArrayList<>(); + for (File dataFile : dataFiles) { + ParquetReader reader = AvroParquetReader + .builder(new Path(dataFile.getAbsolutePath())).build(); + GenericRecord nextRecord = reader.read(); + while (nextRecord != null) { + readBuffer.add(filterOutVariables(nextRecord)); + nextRecord = reader.read(); + } + readBuffer.sort(Comparator.naturalOrder()); + } + + assertThat(readBuffer, is(expected.get(partitionDir.getName()))); + } + } + /** * Checks the source data are written as expected. * *

Note: Replace it with the Flink reader when it is supported. * - * @param basePath The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path + * @param basePath The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path */ public static void checkWrittenFullData( File basePath, @@ -431,12 +468,12 @@ public static void checkWrittenFullData( * *

Note: Replace it with the Flink reader when it is supported. * - * @param fs The file system + * @param fs The file system * @param latestInstant The latest committed instant of current table - * @param baseFile The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path - * @param partitions The expected partition number - * @param schema The read schema + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + * @param schema The read schema */ public static void checkWrittenDataMOR( FileSystem fs,