Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.hudi.table;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -29,7 +29,6 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
Expand Down Expand Up @@ -272,39 +271,24 @@ private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
metaClient.getActiveTimeline().getCommitsTimeline()
.filterCompletedInstants(), fileStatuses);
List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());
String latestCommit = fsView.getLastInstant().get().getTimestamp();
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
final AtomicInteger cnt = new AtomicInteger(0);
if (latestFiles.size() > 0) {
Map<HoodieBaseFile, List<String>> fileGroup =
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(hadoopConf, latestFiles);
return fileGroup.entrySet().stream().map(kv -> {
HoodieBaseFile baseFile = kv.getKey();
Option<List<String>> logPaths = kv.getValue().size() == 0
? Option.empty()
: Option.of(kv.getValue());
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
baseFile.getPath(), logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList());
} else {
// all the files are logs
return Arrays.stream(paths).map(partitionPath -> {
String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath);
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
null, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList()); })
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
// generates one input split for each file group
return Arrays.stream(paths).map(partitionPath -> {
String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath);
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
.map(fileSlice -> {
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
}).collect(Collectors.toList()); })
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

public InputFormat<RowData, ?> getInputFormat() {
Expand Down Expand Up @@ -431,11 +415,12 @@ public Configuration getConf() {
}

/**
* Reload the active timeline view.
* Reset the state of the table source.
*/
@VisibleForTesting
public void reloadActiveTimeline() {
public void reset() {
this.metaClient.reloadActiveTimeline();
this.requiredPartitions = null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void testRead(HoodieTableType tableType) throws Exception {
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);

// refresh the input format
this.tableSource.reloadActiveTimeline();
this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat();

result = readData(inputFormat);
Expand Down Expand Up @@ -133,8 +133,12 @@ void testReadBaseAndLogFiles() throws Exception {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);

// write another commit using logs with separate partition
// so the file group has only logs
TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);

// refresh the input format
this.tableSource.reloadActiveTimeline();
this.tableSource.reset();
inputFormat = this.tableSource.getInputFormat();

result = readData(inputFormat);
Expand All @@ -143,6 +147,10 @@ void testReadBaseAndLogFiles() throws Exception {
expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+ "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
+ "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
+ "id12,Monica,27,1970-01-01T00:00:00.009,par5, "
+ "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, "
+ "id14,Rachel,52,1970-01-01T00:00:00.011,par6, "
+ "id15,Ross,29,1970-01-01T00:00:00.012,par6, "
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+ "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
+ "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
Expand Down
11 changes: 11 additions & 0 deletions hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ public class TestData {
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
);

public static List<RowData> DATA_SET_INSERT_SEPARATE_PARTITION = Arrays.asList(
insertRow(StringData.fromString("id12"), StringData.fromString("Monica"), 27,
TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
insertRow(StringData.fromString("id13"), StringData.fromString("Phoebe"), 31,
TimestampData.fromEpochMillis(10), StringData.fromString("par5")),
insertRow(StringData.fromString("id14"), StringData.fromString("Rachel"), 52,
TimestampData.fromEpochMillis(11), StringData.fromString("par6")),
insertRow(StringData.fromString("id15"), StringData.fromString("Ross"), 29,
TimestampData.fromEpochMillis(12), StringData.fromString("par6"))
);

public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
static {
IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
Expand Down