diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java index e1fd1618b77af..70ee473d245dd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java @@ -19,5 +19,5 @@ package org.apache.hudi.table.action.commit; public enum BucketType { - UPDATE, INSERT, APPEND_ONLY + UPDATE, INSERT } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 71ca1b6e8056b..05e4481ecfbe7 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -56,7 +56,6 @@ import org.apache.hudi.table.HoodieTimelineArchiveLog; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.compact.FlinkCompactHelpers; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; @@ -409,12 +408,6 @@ public void cleanHandlesGracefully() { final HoodieRecordLocation loc = record.getCurrentLocation(); final String fileID = loc.getFileId(); final String partitionPath = record.getPartitionPath(); - // append only mode always use FlinkCreateHandle - if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) { - return new FlinkCreateHandle<>(config, instantTime, table, partitionPath, - fileID, table.getTaskContextSupplier()); - } - if (bucketToHandles.containsKey(fileID)) { MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID); if (lastHandle.shouldReplace()) { @@ -431,8 +424,7 @@ public void cleanHandlesGracefully() { if (isDelta) { writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, table.getTaskContextSupplier()); - } else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) { - // use the same handle for insert bucket + } else if (loc.getInstantTime().equals("I")) { writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()); } else { diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java index 41d06667f9574..987f3350d5711 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java @@ -25,7 +25,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; -import org.apache.hudi.table.action.commit.BucketType; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -81,7 +80,7 @@ protected boolean needsUpdateLocation() { @Override protected boolean isUpdateRecord(HoodieRecord hoodieRecord) { return hoodieRecord.getCurrentLocation() != null - && hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name()); + && hoodieRecord.getCurrentLocation().getInstantTime().equals("U"); } @Override diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java index 49d8918b9f191..3ff579fed4112 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkCreateHandle.java @@ -30,7 +30,6 @@ import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; -import org.apache.hudi.table.MarkerFiles; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -71,17 +70,6 @@ public FlinkCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTab } } - @Override - protected void createMarkerFile(String partitionPath, String dataFileName) { - // In some rare cases, the task was pulled up again with same write file name, - // for e.g, reuse the small log files from last commit instant. - - // Just skip the marker file creation if it already exists, the new data would append to - // the file directly. - MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime); - markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType()); - } - /** * The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A) * (thus the fs view got the written data files some of which may be invalid), diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 7dcc2406b66d6..5cfd28be2c1dc 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -102,7 +102,9 @@ public HoodieWriteMetadata> execute(List> inpu final HoodieRecord record = inputRecords.get(0); final String partitionPath = record.getPartitionPath(); final String fileId = record.getCurrentLocation().getFileId(); - final BucketType bucketType = BucketType.valueOf(record.getCurrentLocation().getInstantTime()); + final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I") + ? BucketType.INSERT + : BucketType.UPDATE; handleUpsertPartition( instantTime, partitionPath, @@ -183,7 +185,6 @@ protected Iterator> handleUpsertPartition( } else { switch (bucketType) { case INSERT: - case APPEND_ONLY: return handleInsert(fileIdHint, recordItr); case UPDATE: return handleUpdate(partitionPath, fileIdHint, recordItr); diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 1af4459c3f3f3..cd309d12b680e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -186,12 +186,6 @@ private FlinkOptions() { .defaultValue(TABLE_TYPE_COPY_ON_WRITE) .withDescription("Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ"); - public static final ConfigOption APPEND_ONLY_ENABLE = ConfigOptions - .key("append_only.enable") - .booleanType() - .defaultValue(false) - .withDescription("Whether to write data to new baseFile without index, only support in COW, default false"); - public static final ConfigOption OPERATION = ConfigOptions .key("write.operation") .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 5cc239bca7f35..75a3454300073 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -28,13 +28,11 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; -import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -57,9 +55,9 @@ * it then assigns the bucket with ID using the {@link BucketAssigner}. * *

All the records are tagged with HoodieRecordLocation, instead of real instant time, - * INSERT record uses "INSERT" and UPSERT record uses "UPDATE" as instant time. There is no need to keep + * INSERT record uses "I" and UPSERT record uses "U" as instant time. There is no need to keep * the "real" instant time for each record, the bucket ID (partition path & fileID) actually decides - * where the record should write to. The "INSERT" and "UPDATE" tags are only used for downstream to decide whether + * where the record should write to. The "I" and "U" tags are only used for downstream to decide whether * the data bucket is an INSERT or an UPSERT, we should factor the tags out when the underneath writer * supports specifying the bucket type explicitly. * @@ -108,18 +106,11 @@ public class BucketAssignFunction> */ private final boolean globalIndex; - private final boolean appendOnly; - public BucketAssignFunction(Configuration conf) { this.conf = conf; this.isChangingRecords = WriteOperationType.isChangingRecords( WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); this.globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); - this.appendOnly = conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE); - if (appendOnly) { - ValidationUtils.checkArgument(conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.COPY_ON_WRITE.name()), - "APPEND_ONLY mode only support in COPY_ON_WRITE table"); - } } @Override @@ -179,33 +170,25 @@ private void processRecord(HoodieRecord record, Collector out) throws Exce final String partitionPath = hoodieKey.getPartitionPath(); final HoodieRecordLocation location; - if (appendOnly) { - location = getNewRecordLocation(partitionPath); - this.context.setCurrentKey(recordKey); - record.setCurrentLocation(location); - out.collect((O) record); - return; - } - // Only changing records need looking up the index for the location, // append only records are always recognized as INSERT. HoodieRecordGlobalLocation oldLoc = indexState.value(); if (isChangingRecords && oldLoc != null) { - // Set up the instant time as "UPDATE" to mark the bucket as an update bucket. + // Set up the instant time as "U" to mark the bucket as an update bucket. if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) { if (globalIndex) { // if partition path changes, emit a delete record for old partition path, // then update the index state using location with new partition path. HoodieRecord deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()), payloadCreation.createDeletePayload((BaseAvroPayload) record.getData())); - deleteRecord.setCurrentLocation(oldLoc.toLocal(BucketType.UPDATE.name())); + deleteRecord.setCurrentLocation(oldLoc.toLocal("U")); deleteRecord.seal(); out.collect((O) deleteRecord); } location = getNewRecordLocation(partitionPath); updateIndexState(partitionPath, location); } else { - location = oldLoc.toLocal(BucketType.UPDATE.name()); + location = oldLoc.toLocal("U"); this.bucketAssigner.addUpdate(partitionPath, location.getFileId()); } } else { @@ -220,26 +203,17 @@ private void processRecord(HoodieRecord record, Collector out) throws Exce } private HoodieRecordLocation getNewRecordLocation(String partitionPath) { - BucketInfo bucketInfo; - if (appendOnly) { - bucketInfo = this.bucketAssigner.addAppendOnly(partitionPath); - } else { - bucketInfo = this.bucketAssigner.addInsert(partitionPath); - } - + final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath); final HoodieRecordLocation location; switch (bucketInfo.getBucketType()) { case INSERT: - // This is an insert bucket, use HoodieRecordLocation instant time as "INSERT". + // This is an insert bucket, use HoodieRecordLocation instant time as "I". // Downstream operators can then check the instant time to know whether // a record belongs to an insert bucket. - location = new HoodieRecordLocation(BucketType.INSERT.name(), bucketInfo.getFileIdPrefix()); + location = new HoodieRecordLocation("I", bucketInfo.getFileIdPrefix()); break; case UPDATE: - location = new HoodieRecordLocation(BucketType.UPDATE.name(), bucketInfo.getFileIdPrefix()); - break; - case APPEND_ONLY: - location = new HoodieRecordLocation(BucketType.APPEND_ONLY.name(), bucketInfo.getFileIdPrefix()); + location = new HoodieRecordLocation("U", bucketInfo.getFileIdPrefix()); break; default: throw new AssertionError(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index 965b5575193d6..6d805ce8d83ec 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -140,14 +140,6 @@ public BucketInfo addInsert(String partitionPath) { } // if we have anything more, create new insert buckets, like normal - return getOrCreateNewFileBucket(partitionPath, BucketType.INSERT); - } - - public BucketInfo addAppendOnly(String partitionPath) { - return getOrCreateNewFileBucket(partitionPath, BucketType.APPEND_ONLY); - } - - private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType bucketType) { if (newFileAssignStates.containsKey(partitionPath)) { NewFileAssignState newFileAssignState = newFileAssignStates.get(partitionPath); if (newFileAssignState.canAssign()) { @@ -156,7 +148,7 @@ private BucketInfo getOrCreateNewFileBucket(String partitionPath, BucketType buc final String key = StreamerUtil.generateBucketKey(partitionPath, newFileAssignState.fileId); return bucketInfoMap.get(key); } - BucketInfo bucketInfo = new BucketInfo(bucketType, FSUtils.createNewFileIdPfx(), partitionPath); + BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); bucketInfoMap.put(key, bucketInfo); newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index c024d7c5cc178..0b4533f7c606e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -69,9 +69,6 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--table-type"}, description = "Type of table. COPY_ON_WRITE (or) MERGE_ON_READ.", required = true) public String tableType; - @Parameter(names = {"--append-only"}, description = "Write data to new parquet in every checkpoint. Only support in COPY_ON_WRITE table.", required = true) - public Boolean appendOnly = false; - @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + "hoodie client, schema provider, key generator and data source. For hoodie client props, sane defaults are " + "used, but recommend use to provide basic things like metrics endpoints, hive configs etc. For sources, refer" @@ -293,13 +290,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName); // copy_on_write works same as COPY_ON_WRITE conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase()); - conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, config.appendOnly); - if (config.appendOnly) { - // append only should use insert operation - conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); - } else { - conf.setString(FlinkOptions.OPERATION, config.operation.value()); - } + conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS, config.payloadClassName); conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 2c2193c78d403..2eeb8f58b82a2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,7 +18,6 @@ package org.apache.hudi.table; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; @@ -145,11 +144,6 @@ private static void setupConfOptions( TableSchema schema) { // table name conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); - // append only - if (conf.getBoolean(FlinkOptions.APPEND_ONLY_ENABLE)) { - // append only should use insert operation - conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); - } // hoodie key about options setupHoodieKeyOptions(conf, table); // cleaning options diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 5f6f62c63b065..b3338a9083cb9 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; @@ -45,7 +44,6 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; -import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -511,83 +509,6 @@ public void testInsertWithDeduplication() throws Exception { checkWrittenData(tempFile, expected, 1); } - @Test - public void testAppendOnly() throws Exception { - // reset the config option - conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0006); // 630 bytes batch size - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, false); - conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); - conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - - // open the function and ingest data - funcWrapper.openFunction(); - // Each record is 208 bytes. so 4 records expect to trigger a mini-batch write - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - // this triggers the data write and event send - funcWrapper.checkpointFunction(1); - Map> dataBuffer = funcWrapper.getDataBuffer(); - assertThat("All data should be flushed out", dataBuffer.size(), is(0)); - - final OperatorEvent event1 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event2 = funcWrapper.getNextEvent(); - assertThat("The operator expect to send an event", event2, instanceOf(WriteMetadataEvent.class)); - - funcWrapper.getCoordinator().handleEventFromOperator(0, event1); - funcWrapper.getCoordinator().handleEventFromOperator(0, event2); - assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the event"); - - String instant = funcWrapper.getWriteClient() - .getLastPendingInstant(getTableType()); - - funcWrapper.checkpointComplete(1); - - Map> expected = new HashMap<>(); - - expected.put("par1", Arrays.asList( - "id1,par1,id1,Danny,23,0,par1", - "id1,par1,id1,Danny,23,1,par1", - "id1,par1,id1,Danny,23,2,par1", - "id1,par1,id1,Danny,23,3,par1", - "id1,par1,id1,Danny,23,4,par1")); - - TestData.checkWrittenAllData(tempFile, expected, 1); - - // started a new instant already - checkInflightInstant(funcWrapper.getWriteClient()); - checkInstantState(funcWrapper.getWriteClient(), HoodieInstant.State.COMPLETED, instant); - - // insert duplicates again - for (RowData rowData : TestData.DATA_SET_INSERT_SAME_KEY) { - funcWrapper.invoke(rowData); - } - - funcWrapper.checkpointFunction(2); - - final OperatorEvent event3 = funcWrapper.getNextEvent(); // remove the first event first - final OperatorEvent event4 = funcWrapper.getNextEvent(); - funcWrapper.getCoordinator().handleEventFromOperator(0, event3); - funcWrapper.getCoordinator().handleEventFromOperator(0, event4); - funcWrapper.checkpointComplete(2); - - // Same the original base file content. - expected.put("par1", Arrays.asList( - "id1,par1,id1,Danny,23,0,par1", - "id1,par1,id1,Danny,23,0,par1", - "id1,par1,id1,Danny,23,1,par1", - "id1,par1,id1,Danny,23,1,par1", - "id1,par1,id1,Danny,23,2,par1", - "id1,par1,id1,Danny,23,2,par1", - "id1,par1,id1,Danny,23,3,par1", - "id1,par1,id1,Danny,23,3,par1", - "id1,par1,id1,Danny,23,4,par1", - "id1,par1,id1,Danny,23,4,par1")); - TestData.checkWrittenAllData(tempFile, expected, 1); - } - @Test public void testInsertWithSmallBufferSize() throws Exception { // reset the config option diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java index b983e8c0ebbf6..07e23b56edc92 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java @@ -23,14 +23,12 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestData; @@ -39,7 +37,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import java.io.File; import java.util.Comparator; @@ -47,8 +44,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.junit.jupiter.api.Assertions.assertThrows; - /** * Test cases for delta stream write. */ @@ -91,16 +86,6 @@ protected Map getExpectedBeforeCheckpointComplete() { return EXPECTED1; } - @Test - public void testAppendOnly() throws Exception { - conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); - conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - assertThrows(IllegalArgumentException.class, () -> { - funcWrapper.openFunction(); - }, "APPEND_ONLY mode only support in COPY_ON_WRITE table"); - } - protected Map getMiniBatchExpected() { Map expected = new HashMap<>(); // MOR mode merges the messages with the same key. diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 10e7d3d4b8b3b..13a71ecb80245 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -19,18 +19,15 @@ package org.apache.hudi.sink; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertThrows; - /** * Test cases for delta stream write with compaction. */ @@ -42,14 +39,10 @@ protected void setUp(Configuration conf) { conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); } + @Disabled @Test - public void testAppendOnly() throws Exception { - conf.setBoolean(FlinkOptions.APPEND_ONLY_ENABLE, true); - conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value()); - funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); - assertThrows(IllegalArgumentException.class, () -> { - funcWrapper.openFunction(); - }, "APPEND_ONLY mode only support in COPY_ON_WRITE table"); + public void testIndexStateBootstrap() { + // Ignore the index bootstrap because we only support parquet load now. } protected Map getMiniBatchExpected() { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 5a00da8cfe805..50ecf543e70ec 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -71,9 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -/** - * Data set for testing, also some utilities to check the results. - */ +/** Data set for testing, also some utilities to check the results. */ public class TestData { public static List DATA_SET_INSERT = Arrays.asList( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -128,7 +126,6 @@ public class TestData { ); public static List DATA_SET_INSERT_DUPLICATES = new ArrayList<>(); - static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -136,7 +133,6 @@ public class TestData { } public static List DATA_SET_INSERT_SAME_KEY = new ArrayList<>(); - static { IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_SAME_KEY.add( insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, @@ -240,8 +236,8 @@ public static String rowDataToString(List rows) { /** * Write a list of row data with Hoodie format base on the given configuration. * - * @param dataBuffer The data buffer to write - * @param conf The flink configuration + * @param dataBuffer The data buffer to write + * @param conf The flink configuration * @throws Exception if error occurs */ public static void writeData( @@ -285,8 +281,8 @@ public static void assertRowsEquals(List rows, String expected) { * Sort the {@code rows} using field at index {@code orderingPos} and asserts * it equals with the expected string {@code expected}. * - * @param rows Actual result rows - * @param expected Expected string of the sorted rows + * @param rows Actual result rows + * @param expected Expected string of the sorted rows * @param orderingPos Field position for ordering */ public static void assertRowsEquals(List rows, String expected, int orderingPos) { @@ -364,10 +360,8 @@ public static void checkWrittenData( assert baseFile.isDirectory(); FileFilter filter = file -> !file.getName().startsWith("."); File[] partitionDirs = baseFile.listFiles(filter); - assertNotNull(partitionDirs); assertThat(partitionDirs.length, is(partitions)); - for (File partitionDir : partitionDirs) { File[] dataFiles = partitionDir.listFiles(filter); assertNotNull(dataFiles); @@ -387,44 +381,13 @@ public static void checkWrittenData( } } - public static void checkWrittenAllData( - File baseFile, - Map> expected, - int partitions) throws IOException { - assert baseFile.isDirectory(); - FileFilter filter = file -> !file.getName().startsWith("."); - File[] partitionDirs = baseFile.listFiles(filter); - - assertNotNull(partitionDirs); - assertThat(partitionDirs.length, is(partitions)); - - for (File partitionDir : partitionDirs) { - File[] dataFiles = partitionDir.listFiles(filter); - assertNotNull(dataFiles); - - List readBuffer = new ArrayList<>(); - for (File dataFile : dataFiles) { - ParquetReader reader = AvroParquetReader - .builder(new Path(dataFile.getAbsolutePath())).build(); - GenericRecord nextRecord = reader.read(); - while (nextRecord != null) { - readBuffer.add(filterOutVariables(nextRecord)); - nextRecord = reader.read(); - } - readBuffer.sort(Comparator.naturalOrder()); - } - - assertThat(readBuffer, is(expected.get(partitionDir.getName()))); - } - } - /** * Checks the source data are written as expected. * *

Note: Replace it with the Flink reader when it is supported. * - * @param basePath The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path + * @param basePath The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path */ public static void checkWrittenFullData( File basePath, @@ -468,12 +431,12 @@ public static void checkWrittenFullData( * *

Note: Replace it with the Flink reader when it is supported. * - * @param fs The file system + * @param fs The file system * @param latestInstant The latest committed instant of current table - * @param baseFile The file base to check, should be a directory - * @param expected The expected results mapping, the key should be the partition path - * @param partitions The expected partition number - * @param schema The read schema + * @param baseFile The file base to check, should be a directory + * @param expected The expected results mapping, the key should be the partition path + * @param partitions The expected partition number + * @param schema The read schema */ public static void checkWrittenDataMOR( FileSystem fs,