diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 164106a4e876c..1881e2558a9f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -244,6 +244,12 @@ private FlinkOptions() { .withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by" + "skipping over files"); + public static final ConfigOption READ_DATA_DELETE = ConfigOptions + .key("read.data.delete.enabled") + .booleanType() + .defaultValue(false)// default not read delete data + .withDescription("Whether to read delete data, default false"); + // ------------------------------------------------------------------------ // Write Options // ------------------------------------------------------------------------ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 2034cb322eb8e..a83830bf3cb40 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -338,6 +338,7 @@ private List buildFileIndex() { final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE); + boolean emitDelete = this.conf.getBoolean(FlinkOptions.READ_DATA_DELETE); switch (queryType) { case FlinkOptions.QUERY_TYPE_SNAPSHOT: final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); @@ -350,7 +351,7 @@ private List buildFileIndex() { return InputFormats.EMPTY_INPUT_FORMAT; } return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, - rowDataType, inputSplits, false); + rowDataType, inputSplits, emitDelete); case COPY_ON_WRITE: return baseFileOnlyInputFormat(); default: @@ -372,7 +373,7 @@ private List buildFileIndex() { return InputFormats.EMPTY_INPUT_FORMAT; } return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema, - rowDataType, result.getInputSplits(), false); + rowDataType, result.getInputSplits(), emitDelete); default: String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType, FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index afe3e809b0c0a..3b218a4c02f22 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -18,6 +18,8 @@ package org.apache.hudi.table; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; import org.apache.hudi.adapter.TestTableEnvs; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; @@ -1185,6 +1187,43 @@ void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Except 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)); } + @Test + void testIncrementalReadWithDeletes() throws Exception { + TableEnvironment tableEnv = batchTableEnv; + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true); + conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1); + conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true); + + // write 4 batches of data set + TestData.writeData(TestData.dataSetInsert(1, 2), conf); + TestData.writeData(TestData.dataSetInsert(3, 4), conf); + TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf); + TestData.writeData(TestData.dataSetInsert(5, 6), conf); + + String startCommit = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 1, true); + String endCommit = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, TestData.dataSetInsert(2, 3, 4, 5, 6)); + + final String query = String.format("select * from t1/*+ options('read.start-commit'='%s', 'read.end-commit'='%s', 'read.data.delete.enabled'='true')*/", startCommit, endCommit); + List result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery(query).execute().collect()); + List expected = TestData.dataSetInsert(3, 4); + RowData deleteRowData = TestData.deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, TimestampData.fromEpochMillis(5), StringData.fromString("par1")); + expected.add(deleteRowData); + assertRowsEquals(result1, expected); + } + @ParameterizedTest @EnumSource(value = HoodieTableType.class) void testReadWithWiderSchema(HoodieTableType tableType) throws Exception { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index d0cf143318e9d..35bf4b835c370 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -815,7 +815,7 @@ public static BinaryRowData insertRow(RowType rowType, Object... fields) { return row; } - private static BinaryRowData deleteRow(Object... fields) { + public static BinaryRowData deleteRow(Object... fields) { BinaryRowData rowData = insertRow(fields); rowData.setRowKind(RowKind.DELETE); return rowData;