Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 62 additions & 47 deletions hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -340,54 +340,69 @@ private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
final RowType requiredRowType = (RowType) getProducedDataType().notNull().getLogicalType();

final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
if (queryType.equals(FlinkOptions.QUERY_TYPE_SNAPSHOT)) {
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits;
if (!isStreaming) {
inputSplits = buildFileIndex(paths);
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
switch (queryType) {
case FlinkOptions.QUERY_TYPE_SNAPSHOT:
final HoodieTableType tableType = HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
final List<MergeOnReadInputSplit> inputSplits;
if (!isStreaming) {
inputSplits = buildFileIndex(paths);
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input format, returns empty collection instead");
return new CollectionInputFormat<>(Collections.emptyList(), null);
}
} else {
// streaming reader would build the splits automatically.
inputSplits = Collections.emptyList();
}
} else {
// streaming reader would build the splits automatically.
inputSplits = Collections.emptyList();
}
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits);
return new MergeOnReadInputFormat(
this.conf,
FilePathUtils.toFlinkPaths(paths),
hoodieTableState,
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable.
"default",
this.limit);
case COPY_ON_WRITE:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getFieldNames(),
this.schema.getFieldDataTypes(),
this.requiredPos,
"default",
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
default:
throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
}
} else {
throw new HoodieException("Invalid query type : '" + queryType + "'. Only '"
+ FlinkOptions.QUERY_TYPE_SNAPSHOT + "' is supported now");
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
requiredRowType,
tableAvroSchema.toString(),
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
inputSplits);
return new MergeOnReadInputFormat(
this.conf,
FilePathUtils.toFlinkPaths(paths),
hoodieTableState,
rowDataType.getChildren(), // use the explicit fields data type because the AvroSchemaConverter is not very stable.
"default",
this.limit);
case COPY_ON_WRITE:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getFieldNames(),
this.schema.getFieldDataTypes(),
this.requiredPos,
"default",
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
default:
throw new HoodieException("Unexpected table type: " + this.conf.getString(FlinkOptions.TABLE_TYPE));
}
case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
FilePathUtils.toFlinkPaths(paths),
this.schema.getFieldNames(),
this.schema.getFieldDataTypes(),
this.requiredPos,
"default",
this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // ParquetInputFormat always uses the limit value
getParquetConf(this.conf, this.hadoopConf),
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
);
format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
return format;
default:
String errMsg = String.format("Invalid query type : '%s', options ['%s', '%s'] are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT, FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
throw new HoodieException(errMsg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@ void testStreamWriteBatchRead() {
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@Test
void testStreamWriteBatchReadOptimized() {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
streamTableEnv.executeSql(createSource);

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
// read optimized is supported for both MOR and COR table,
// test MOR streaming write with compaction then reads as
// query type 'read_optimized'.
options.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
options.put(FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
streamTableEnv.executeSql(hoodieTableDDL);
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

List<Row> rows = CollectionUtil.iterableToList(
() -> streamTableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testWriteAndRead(ExecMode execMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand Down Expand Up @@ -152,7 +151,6 @@ public void cancel() {

private void loadDataBuffer() {
try {
new File(this.path.toString()).exists();
this.dataBuffer = Files.readAllLines(Paths.get(this.path.toUri()));
} catch (IOException e) {
throw new RuntimeException("Read file " + this.path + " error", e);
Expand Down