Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ private FlinkOptions() {
.withDescription("Enables data-skipping allowing queries to leverage indexes to reduce the search space by"
+ "skipping over files");

public static final ConfigOption<Boolean> READ_DATA_DELETE = ConfigOptions
.key("read.data.delete.enabled")
.booleanType()
.defaultValue(false)// default not read delete data
.withDescription("Whether to read delete data, default false");

// ------------------------------------------------------------------------
// Write Options
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();

final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
boolean emitDelete = this.conf.getBoolean(FlinkOptions.READ_DATA_DELETE);
switch (queryType) {
case FlinkOptions.QUERY_TYPE_SNAPSHOT:
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
Expand All @@ -350,7 +351,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, inputSplits, false);
rowDataType, inputSplits, emitDelete);
case COPY_ON_WRITE:
return baseFileOnlyInputFormat();
default:
Expand All @@ -372,7 +373,7 @@ private List<MergeOnReadInputSplit> buildFileIndex() {
return InputFormats.EMPTY_INPUT_FORMAT;
}
return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
rowDataType, result.getInputSplits(), false);
rowDataType, result.getInputSplits(), emitDelete);
default:
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.table;

import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.hudi.adapter.TestTableEnvs;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
Expand Down Expand Up @@ -1185,6 +1187,43 @@ void testIncrementalReadArchivedCommits(HoodieTableType tableType) throws Except
11, 12, 13, 14, 15, 16, 17, 18, 19, 20));
}

@Test
void testIncrementalReadWithDeletes() throws Exception {
TableEnvironment tableEnv = batchTableEnv;
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.TABLE_NAME, "t1");
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setBoolean(FlinkOptions.CHANGELOG_ENABLED, true);

// write 4 batches of data set
TestData.writeData(TestData.dataSetInsert(1, 2), conf);
TestData.writeData(TestData.dataSetInsert(3, 4), conf);
TestData.writeData(TestData.DATA_SET_INSERT_UPDATE_DELETE, conf);
TestData.writeData(TestData.dataSetInsert(5, 6), conf);

String startCommit = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 1, true);
String endCommit = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name())
.end();
tableEnv.executeSql(hoodieTableDDL);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result, TestData.dataSetInsert(2, 3, 4, 5, 6));

final String query = String.format("select * from t1/*+ options('read.start-commit'='%s', 'read.end-commit'='%s', 'read.data.delete.enabled'='true')*/", startCommit, endCommit);
List<Row> result1 = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery(query).execute().collect());
List<RowData> expected = TestData.dataSetInsert(3, 4);
RowData deleteRowData = TestData.deleteRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22, TimestampData.fromEpochMillis(5), StringData.fromString("par1"));
expected.add(deleteRowData);
assertRowsEquals(result1, expected);
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ public static BinaryRowData insertRow(RowType rowType, Object... fields) {
return row;
}

private static BinaryRowData deleteRow(Object... fields) {
public static BinaryRowData deleteRow(Object... fields) {
BinaryRowData rowData = insertRow(fields);
rowData.setRowKind(RowKind.DELETE);
return rowData;
Expand Down