diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 4485df6ecf7f1..7bb20afc760be 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -340,54 +340,69 @@ private List buildFileIndex(Path[] paths) { final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType(); final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE); - if (queryType.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) { - final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); - switch (tableType) { - case MERGE_ON_READ: - final List inputSplits; - if (!isStreaming) { - inputSplits = buildFileIndex(paths); - if (inputSplits.size() == 0) { - // When there is no input splits, just return an empty source. - LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); - return new CollectionInputFormat<>(Collections.emptyList(), null); + switch (queryType) { + case FlinkOptions.QUERY_TYPE_SNAPSHOT: + final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE)); + switch (tableType) { + case MERGE_ON_READ: + final List inputSplits; + if (!isStreaming) { + inputSplits = buildFileIndex(paths); + if (inputSplits.size() == 0) { + // When there is no input splits, just return an empty source. + LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead"); + return new CollectionInputFormat<>(Collections.emptyList(), null); + } + } else { + // streaming reader would build the splits automatically. + inputSplits = Collections.emptyList(); } - } else { - // streaming reader would build the splits automatically. - inputSplits = Collections.emptyList(); - } - final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( - rowType, - requiredRowType, - tableAvroSchema.toString(), - AvroSchemaConverter.convertToSchema(requiredRowType).toString(), - inputSplits); - return new MergeOnReadInputFormat( - this.conf, - FilePathUtils.toFlinkPaths(paths), - hoodieTableState, - rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. - "default", - this.limit); - case COPY_ON_WRITE: - FileInputFormat format = new CopyOnWriteInputFormat( - FilePathUtils.toFlinkPaths(paths), - this.schema.getFieldNames(), - this.schema.getFieldDataTypes(), - this.requiredPos, - "default", - this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value - getParquetConf(this.conf, this.hadoopConf), - this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) - ); - format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); - return format; - default: - throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE)); - } - } else { - throw new HoodieException("Invalid query type : '" + queryType + "'. Only '" - + FlinkOptions.QUERY_TYPE_SNAPSHOT + "' is supported now"); + final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( + rowType, + requiredRowType, + tableAvroSchema.toString(), + AvroSchemaConverter.convertToSchema(requiredRowType).toString(), + inputSplits); + return new MergeOnReadInputFormat( + this.conf, + FilePathUtils.toFlinkPaths(paths), + hoodieTableState, + rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable. + "default", + this.limit); + case COPY_ON_WRITE: + FileInputFormat format = new CopyOnWriteInputFormat( + FilePathUtils.toFlinkPaths(paths), + this.schema.getFieldNames(), + this.schema.getFieldDataTypes(), + this.requiredPos, + "default", + this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value + getParquetConf(this.conf, this.hadoopConf), + this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) + ); + format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); + return format; + default: + throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE)); + } + case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED: + FileInputFormat format = new CopyOnWriteInputFormat( + FilePathUtils.toFlinkPaths(paths), + this.schema.getFieldNames(), + this.schema.getFieldDataTypes(), + this.requiredPos, + "default", + this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value + getParquetConf(this.conf, this.hadoopConf), + this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE) + ); + format.setFilesFilter(new LatestFileFilter(this.hadoopConf)); + return format; + default: + String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType, + FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED); + throw new HoodieException(errMsg); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 8b39ee74e04e3..ed5f1323da142 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -151,6 +151,30 @@ void testStreamWriteBatchRead() { assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); } + @Test + void testStreamWriteBatchReadOptimized() { + // create filesystem table named source + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + // read optimized is supported for both MOR and COR table, + // test MOR streaming write with compaction then reads as + // query type 'read_optimized'. + options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED); + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + List rows = CollectionUtil.iterableToList( + () -> streamTableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } + @ParameterizedTest @EnumSource(value = ExecMode.class) void testWriteAndRead(ExecMode execMode) { diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java index 992495642e517..446aba0a75bd8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/source/ContinuousFileSource.java @@ -33,7 +33,6 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -152,7 +151,6 @@ public void cancel() { private void loadDataBuffer() { try { - new File(this.path.toString()).exists(); this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri())); } catch (IOException e) { throw new RuntimeException("Read file " + this.path + " error", e);