Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.common.table.timeline;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.model.HoodieLogFile;
Expand All @@ -28,14 +32,10 @@
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
Expand All @@ -51,7 +51,6 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the
Expand Down Expand Up @@ -147,7 +146,8 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
final String action = record.get(ACTION_TYPE_KEY).toString();
if (loadDetails) {
Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
getMetadataKey(action).map(key -> {
Object actionData = record.get(key);
if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
} else {
Expand All @@ -159,22 +159,25 @@ private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
}

private String getMetadataKey(String action) {
@Nonnull
private Option<String> getMetadataKey(String action) {
switch (action) {
case HoodieTimeline.CLEAN_ACTION:
return "hoodieCleanMetadata";
return Option.of("hoodieCleanMetadata");
case HoodieTimeline.COMMIT_ACTION:
return "hoodieCommitMetadata";
case HoodieTimeline.DELTA_COMMIT_ACTION:
return "hoodieCommitMetadata";
return Option.of("hoodieCommitMetadata");
case HoodieTimeline.ROLLBACK_ACTION:
return "hoodieRollbackMetadata";
return Option.of("hoodieRollbackMetadata");
case HoodieTimeline.SAVEPOINT_ACTION:
return "hoodieSavePointMetadata";
return Option.of("hoodieSavePointMetadata");
case HoodieTimeline.COMPACTION_ACTION:
return "hoodieCompactionPlan";
return Option.of("hoodieCompactionPlan");
case HoodieTimeline.REPLACE_COMMIT_ACTION:
return Option.of("hoodieReplaceCommitMetadata");
default:
throw new HoodieIOException("Unknown action in metadata " + action);
LOG.error(String.format("Unknown action in metadata (%s)", action));
return Option.empty();
}
}

Expand All @@ -199,35 +202,33 @@ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadIns
private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails,
Function<GenericRecord, Boolean> commitsFilter) {
try {
// list all files
// List all files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));

// sort files by version suffix in reverse (implies reverse chronological order)
// Sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());

List<HoodieInstant> instantsInRange = new ArrayList<>();
for (FileStatus fs : fsStatuses) {
//read the archived file
// Read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
int instantsInPreviousFile = instantsInRange.size();
//read the avro blocks
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
// TODO If we can store additional metadata in datablock, we can skip parsing records
// (such as startTime, endTime of records in the block)
List<IndexedRecord> records = blk.getRecords();
// filter blocks in desired time window
Stream<HoodieInstant> instantsInBlkStream = records.stream()
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails));

if (filter != null) {
instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange);
}

instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
// Filter blocks in desired time window
instantsInRange.addAll(
records.stream()
.filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r, loadInstantDetails))
.filter(c -> filter == null || filter.isInRange(c))
.collect(Collectors.toList())
);
}

if (filter != null) {
Expand Down