Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private String printCommits(HoodieDefaultTimeline timeline,
final List<Comparable[]> rows = new ArrayList<>();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList());
.getInstantsAsStream().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
Expand Down Expand Up @@ -103,7 +103,7 @@ private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
final List<Comparable[]> rows = new ArrayList<>();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList());
.getInstantsAsStream().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
Expand Down Expand Up @@ -372,20 +372,20 @@ public String compareCommits(@ShellOption(value = {"--path"}, help = "Path of th
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String targetLatestCommit =
targetTimeline.getInstants().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
targetTimeline.getInstantsAsStream().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
String sourceLatestCommit =
sourceTimeline.getInstants().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";
sourceTimeline.getInstantsAsStream().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";

if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
// source is behind the target
List<String> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
return "Source " + source.getTableConfig().getTableName() + " is behind by " + commitsToCatchup.size()
+ " commits. Commits to catch up - " + commitsToCatchup;
} else {
List<String> commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE)
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
return "Source " + source.getTableConfig().getTableName() + " is ahead by " + commitsToCatchup.size()
+ " commits. Commits to catch up - " + commitsToCatchup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private static String printAllCompactions(HoodieDefaultTimeline timeline,
.collect(Collectors.toList());

Set<String> committedInstants = timeline.getCommitTimeline().filterCompletedInstants()
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());

List<Comparable[]> rows = new ArrayList<>();
for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : compactionPlans) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private String printDiffWithMetadata(HoodieDefaultTimeline timeline, Integer lim
BiFunction<HoodieWriteStat, String, Boolean> diffEntityChecker) throws IOException {
List<Comparable[]> rows = new ArrayList<>();
List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList());
.getInstantsAsStream().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
Option<byte[]> instantDetails = timeline.getInstantDetails(commit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public String exportInstants(
// The non archived instants can be listed from the Timeline.
HoodieTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline().filterCompletedInstants()
.filter(i -> actionSet.contains(i.getAction()));
List<HoodieInstant> nonArchivedInstants = timeline.getInstants().collect(Collectors.toList());
List<HoodieInstant> nonArchivedInstants = timeline.getInstants();

// Archived instants are in the commit archive files
FileStatus[] statuses = FSUtils.getFs(basePath, HoodieCLI.conf).globStatus(archivePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
timeline = timeline.filterCompletedInstants();
}

instantsStream = timeline.getInstants();
instantsStream = timeline.getInstantsAsStream();

if (!maxInstant.isEmpty()) {
final BiPredicate<String, String> predicate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public String validateSync(
private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit)
throws IOException {
List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList());
.getInstants();
if (commitsToCatchup.isEmpty()) {
return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count("
+ source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void removeCorruptedPendingCleanAction() {
public void showFailedCommits() {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
activeTimeline.filterCompletedInstants().getInstants().filter(activeTimeline::isEmpty).forEach(hoodieInstant -> LOG.warn("Empty Commit: " + hoodieInstant.toString()));
activeTimeline.filterCompletedInstants().getInstantsAsStream().filter(activeTimeline::isEmpty).forEach(hoodieInstant -> LOG.warn("Empty Commit: " + hoodieInstant.toString()));
}

@ShellMethod(key = "repair migrate-partition-meta", value = "Migrate all partition meta file currently stored in text format "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public String rollbackToSavepoint(
}
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> instants = timeline.getInstants().filter(instant -> instant.getTimestamp().equals(instantTime)).collect(Collectors.toList());
List<HoodieInstant> instants = timeline.getInstantsAsStream().filter(instant -> instant.getTimestamp().equals(instantTime)).collect(Collectors.toList());

if (instants.isEmpty()) {
return "Commit " + instantTime + " not found in Commits " + timeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* CLI command to displays stats options.
Expand Down Expand Up @@ -73,7 +72,7 @@ public String writeAmplificationStats(

List<Comparable[]> rows = new ArrayList<>();
DecimalFormat df = new DecimalFormat("#.00");
for (HoodieInstant instantTime : timeline.getInstants().collect(Collectors.toList())) {
for (HoodieInstant instantTime : timeline.getInstants()) {
String waf = "0";
HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instantTime).get(),
HoodieCommitMetadata.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private String printTimelineInfo(
Integer limit, String sortByField, boolean descending, boolean headerOnly, boolean withRowNo,
boolean showTimeSeconds, boolean showRollbackInfo) {
Map<String, List<String>> rollbackInfo = getRolledBackInstantInfo(timeline);
final List<Comparable[]> rows = timeline.getInstants().map(instant -> {
final List<Comparable[]> rows = timeline.getInstantsAsStream().map(instant -> {
int numColumns = showRollbackInfo ? 7 : 6;
Comparable[] row = new Comparable[numColumns];
String instantTimestamp = instant.getTimestamp();
Expand Down Expand Up @@ -343,8 +343,7 @@ private Map<String, List<String>> getRolledBackInstantInfo(HoodieTimeline timeli
// Instant rolled back or to roll back -> rollback instants
Map<String, List<String>> rollbackInfoMap = new HashMap<>();
List<HoodieInstant> rollbackInstants = timeline.filter(instant ->
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction()))
.getInstants().collect(Collectors.toList());
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction())).getInstants();
rollbackInstants.forEach(rollbackInstant -> {
try {
if (rollbackInstant.isInflight()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ public void testArchiving() throws Exception {

//get instants in the active timeline only returns the latest state of the commit
//therefore we expect 2 instants because minCommits is 2
assertEquals(2, metaClient.getActiveTimeline().getInstants().count());
assertEquals(2, metaClient.getActiveTimeline().countInstants());

//get instants in the archived timeline returns all instants in the commit
//therefore we expect 12 instants because 6 commits - 2 commits in active timeline = 4 in archived
//since each commit is completed, there are 3 instances per commit (requested, inflight, completed)
//and 3 instances per commit * 4 commits = 12 instances
assertEquals(12, metaClient.getArchivedTimeline().getInstants().count());
assertEquals(12, metaClient.getArchivedTimeline().countInstants());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ public void testShowCleans() throws Exception {

// First, run clean
SparkMain.clean(jsc(), HoodieCLI.basePath, propsFilePath.getPath(), new ArrayList<>());
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().countInstants(),
"Loaded 1 clean and the count should match");

Object result = shell.evaluate(() -> "cleans show");
assertTrue(ShellEvaluationResultUtil.isSuccess(result));

HoodieInstant clean = metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().findFirst().orElse(null);
HoodieInstant clean = metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstantsAsStream().findFirst().orElse(null);
assertNotNull(clean);

TableHeader header =
Expand Down Expand Up @@ -163,10 +163,10 @@ public void testShowCleanPartitions() {

// First, run clean with two partition
SparkMain.clean(jsc(), HoodieCLI.basePath, propsFilePath.toString(), new ArrayList<>());
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().count(),
assertEquals(1, metaClient.getActiveTimeline().reload().getCleanerTimeline().countInstants(),
"Loaded 1 clean and the count should match");

HoodieInstant clean = metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstants().findFirst().get();
HoodieInstant clean = metaClient.getActiveTimeline().reload().getCleanerTimeline().getInstantsAsStream().findFirst().get();

Object result = shell.evaluate(() -> "clean showpartitions --clean " + clean.getTimestamp());
assertTrue(ShellEvaluationResultUtil.isSuccess(result));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public void testCompareCommits(HoodieTableType tableType) throws Exception {

// the latest instant of test_table2 is 101
List<String> commitsToCatchup = metaClient.getActiveTimeline().findInstantsAfter("101", Integer.MAX_VALUE)
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
String expected = String.format("Source %s is ahead by %d commits. Commits to catch up - %s",
tableName1, commitsToCatchup.size(), commitsToCatchup);
assertEquals(expected, result.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,14 @@ public void testRemoveCorruptedPendingCleanAction() throws IOException {
// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient);
// first, there are four instants
assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
assertEquals(4, metaClient.getActiveTimeline().filterInflightsAndRequested().countInstants());

Object result = shell.evaluate(() -> "repair corrupted clean files");
assertTrue(ShellEvaluationResultUtil.isSuccess(result));

// reload meta client
metaClient = HoodieTableMetaClient.reload(metaClient);
assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().countInstants());
}

/**
Expand All @@ -283,7 +283,7 @@ public void testShowFailedCommits() {
HoodieTestCommitMetadataGenerator.createCommitFile(tablePath, timestamp, conf);
}

metaClient.getActiveTimeline().getInstants().filter(hoodieInstant -> Integer.parseInt(hoodieInstant.getTimestamp()) % 4 == 0).forEach(hoodieInstant -> {
metaClient.getActiveTimeline().getInstantsAsStream().filter(hoodieInstant -> Integer.parseInt(hoodieInstant.getTimestamp()) % 4 == 0).forEach(hoodieInstant -> {
metaClient.getActiveTimeline().deleteInstantFileIfExists(hoodieInstant);
metaClient.getActiveTimeline().createNewInstant(hoodieInstant);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testShowRollbacks() {

// get rollback instants
HoodieActiveTimeline activeTimeline = new RollbacksCommand.RollbackTimeline(HoodieCLI.getTableMetaClient());
Stream<HoodieInstant> rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants().getInstants();
Stream<HoodieInstant> rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants().getInstantsAsStream();

List<Comparable[]> rows = new ArrayList<>();
rollback.sorted().forEach(instant -> {
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testShowRollbacks() {
public void testShowRollback() throws IOException {
// get instant
HoodieActiveTimeline activeTimeline = new RollbacksCommand.RollbackTimeline(HoodieCLI.getTableMetaClient());
Stream<HoodieInstant> rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants().getInstants();
Stream<HoodieInstant> rollback = activeTimeline.getRollbackTimeline().filterCompletedInstants().getInstantsAsStream();
HoodieInstant instant = rollback.findFirst().orElse(null);
assertNotNull(instant, "The instant can not be null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ public void testClustering() throws IOException {

// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.filterCompletedInstants().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");

assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstants()
.getCompletedReplaceTimeline().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending clustering must be completed");
}
Expand All @@ -156,7 +156,7 @@ public void testClusteringScheduleAndExecute() throws IOException {

// assert clustering complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.getCompletedReplaceTimeline().getInstants()
.getCompletedReplaceTimeline().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed clustering couldn't be 0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void testCompact() throws IOException {

// assert compaction complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.filterCompletedInstants().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
"Pending compaction must be completed");
}
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testCompactScheduleAndExecute() throws IOException {

// assert compaction complete
assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
.filterCompletedInstants().getInstants()
.filterCompletedInstants().getInstantsAsStream()
.map(HoodieInstant::getTimestamp).count() > 0,
"Completed compaction couldn't be 0");
}
Expand Down
Loading