Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,33 @@ public final Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partit
}
}

/**
* Stream all "merged" file-slices before on an instant time
* for a MERGE_ON_READ table with index that can index log files(which means it writes pure logs first).
*
* <p>In streaming read scenario, in order for better reading efficiency, the user can choose to skip the
* base files that are produced by compaction. That is to say, we allow the users to consumer only from
* these partitioned log files, these log files keep the record sequence just like the normal message queue.
*
* <p>NOTE: only local view is supported.
*
* @param partitionStr Partition Path
* @param maxInstantTime Max Instant Time
*/
public final Stream<FileSlice> getAllLogsMergedFileSliceBeforeOrOn(String partitionStr, String maxInstantTime) {
try {
readLock.lock();
String partition = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partition);
return fetchAllStoredFileGroups(partition)
.filter(fg -> !isFileGroupReplacedBeforeOrOn(fg.getFileGroupId(), maxInstantTime))
.map(fileGroup -> fetchAllLogsMergedFileSlice(fileGroup, maxInstantTime))
.filter(Option::isPresent).map(Option::get).map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
}

@Override
public final Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) {
try {
Expand Down Expand Up @@ -1076,6 +1103,29 @@ private FileSlice fetchMergedFileSlice(HoodieFileGroup fileGroup, FileSlice file
return fileSlice;
}

/**
* Returns the file slice with all the file slice log files merged.
*
* @param fileGroup File Group for which the file slice belongs to
* @param maxInstantTime The max instant time
*/
private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup fileGroup, String maxInstantTime) {
List<FileSlice> fileSlices = fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList());
if (fileSlices.size() == 0) {
return Option.empty();
}
if (fileSlices.size() == 1) {
return Option.of(fileSlices.get(0));
}
final FileSlice latestSlice = fileSlices.get(0);
FileSlice merged = new FileSlice(latestSlice.getPartitionPath(), latestSlice.getBaseInstantTime(),
latestSlice.getFileId());

// add log files from the latest slice to the earliest
fileSlices.forEach(slice -> slice.getLogFiles().forEach(merged::addLogFile));
return Option.of(merged);
}

/**
* Default implementation for fetching latest base-file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
Expand Down Expand Up @@ -220,7 +221,7 @@ public Result inputSplits(
: instants.get(instants.size() - 1).getTimestamp();

List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange);
fileStatuses, readPartitions, endInstant, instantRange, false);

return Result.instance(inputSplits, endInstant);
}
Expand All @@ -235,8 +236,9 @@ public Result inputSplits(
*/
public Result inputSplits(
HoodieTableMetaClient metaClient,
org.apache.hadoop.conf.Configuration hadoopConf,
String issuedInstant) {
@Nullable org.apache.hadoop.conf.Configuration hadoopConf,
String issuedInstant,
boolean cdcEnabled) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
Expand All @@ -248,129 +250,93 @@ public Result inputSplits(
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), false);
// when cdc is enabled, returns instant range with nullable boundary
// to filter the reading instants on the timeline
instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), cdcEnabled);
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return Result.EMPTY;
}

String tableName = conf.getString(FlinkOptions.TABLE_NAME);

Set<String> readPartitions;
final FileStatus[] fileStatuses;

if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = getFileIndex();
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();
} else {
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;

readPartitions = getReadPartitions(metadataList);
if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());
}

if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
}

final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange);

return Result.instance(inputSplits, endInstant);
}

/**
* Returns the incremental cdc input splits.
*
* @param metaClient The meta client
* @param issuedInstant The last issued instant, only valid in streaming read
* @return The list of incremental input splits or empty if there are no new instants
*/
public Result inputSplitsCDC(
HoodieTableMetaClient metaClient,
String issuedInstant) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
return Result.EMPTY;
}
List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline, issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
instantRange = getInstantRange(issuedInstant, instantToIssue.getTimestamp(), true);
} else {
LOG.info("No new instant found for the table under path " + path + ", skip reading");
return Result.EMPTY;
}

Set<String> readPartitions;
final Set<String> readPartitions;
final FileStatus[] fileStatuses;

if (instantRange == null) {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = getFileIndex();
readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());
if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading in path: " + path);
LOG.warn("No partitions found for reading under path: " + path);
return Result.EMPTY;
}
fileStatuses = fileIndex.getFilesInPartitions();

if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in path: " + path);
LOG.warn("No files found for reading under path: " + path);
return Result.EMPTY;
}

final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, null);
fileStatuses, readPartitions, endInstant, null, false);

return Result.instance(inputSplits, endInstant);
} else {
HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
final String endInstant = instantToIssue.getTimestamp();
Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();
// streaming read
if (cdcEnabled) {
// case1: cdc change log enabled
HoodieCDCExtractor extractor = new HoodieCDCExtractor(metaClient, instantRange);
final String endInstant = instantToIssue.getTimestamp();
Map<HoodieFileGroupId, List<HoodieCDCFileSplit>> fileSplits = extractor.extractCDCFileSplits();

if (fileSplits.isEmpty()) {
LOG.warn("No change logs found for reading in path: " + path);
return Result.EMPTY;
}

if (fileSplits.isEmpty()) {
LOG.warn("No change logs found for reading in path: " + path);
return Result.EMPTY;
}
final AtomicInteger cnt = new AtomicInteger(0);
List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
.map(splits ->
new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
.collect(Collectors.toList());
return Result.instance(inputSplits, endInstant);
} else {
// case2: normal streaming read
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn("\n"
+ "--------------------------------------------------------------------------------\n"
+ "---------- caution: the reader has fall behind too much from the writer,\n"
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
+ "--------------------------------------------------------------------------------");
}
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
// IMPORTANT: the merged metadata list must be in ascending order by instant time
? mergeList(archivedMetadataList, activeMetadataList)
: activeMetadataList;

final AtomicInteger cnt = new AtomicInteger(0);
List<MergeOnReadInputSplit> inputSplits = fileSplits.entrySet().stream()
.map(splits ->
new CdcInputSplit(cnt.getAndAdd(1), metaClient.getBasePath(), maxCompactionMemoryInBytes,
splits.getKey().getFileId(), splits.getValue().stream().sorted().toArray(HoodieCDCFileSplit[]::new)))
.collect(Collectors.toList());
return Result.instance(inputSplits, endInstant);
readPartitions = getReadPartitions(metadataList);
if (readPartitions.size() == 0) {
LOG.warn("No partitions found for reading under path: " + path);
return Result.EMPTY;
}
fileStatuses = WriteProfiles.getWritePathsOfInstants(path, hadoopConf, metadataList, metaClient.getTableType());

if (fileStatuses.length == 0) {
LOG.warn("No files found for reading under path: " + path);
return Result.EMPTY;
}

final String endInstant = instantToIssue.getTimestamp();
List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline,
fileStatuses, readPartitions, endInstant, instantRange, skipCompaction);

return Result.instance(inputSplits, endInstant);
}
}
}

Expand Down Expand Up @@ -401,12 +367,13 @@ private List<MergeOnReadInputSplit> getInputSplits(
FileStatus[] fileStatuses,
Set<String> readPartitions,
String endInstant,
InstantRange instantRange) {
InstantRange instantRange,
boolean skipBaseFiles) {
final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
return readPartitions.stream()
.map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
.map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, endInstant, skipBaseFiles)
.map(fileSlice -> {
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
Expand All @@ -421,6 +388,15 @@ private List<MergeOnReadInputSplit> getInputSplits(
.collect(Collectors.toList());
}

private static Stream<FileSlice> getFileSlices(
HoodieTableFileSystemView fsView,
String relPartitionPath,
String endInstant,
boolean skipBaseFiles) {
return skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(relPartitionPath, endInstant)
: fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant);
}

private FileIndex getFileIndex() {
FileIndex fileIndex = FileIndex.instance(new org.apache.hadoop.fs.Path(path.toUri()), conf, rowType);
if (this.requiredPartitions != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,8 @@ public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> cont
// table does not exist
return;
}
IncrementalInputSplits.Result result = cdcEnabled
? incrementalInputSplits.inputSplitsCDC(metaClient, this.issuedInstant)
: incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant, this.cdcEnabled);
if (result.isEmpty()) {
// no new instants, returns early
return;
Expand Down
Loading