Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,23 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.NumericUtils;

import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -51,6 +57,49 @@
@Component
public class CommitsCommand implements CommandMarker {

private String printCommits(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toList());
// timeline can be read from multiple files. So sort is needed instead of reversing the collection
Collections.sort(commits, HoodieInstant.COMPARATOR.reversed());

for (int i = 0; i < commits.size(); i++) {
final HoodieInstant commit = commits.get(i);
final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[]{commit.getTimestamp(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(),
commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(),
commitMetadata.fetchTotalWriteErrors()});
}

final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});

final TableHeader header = new TableHeader()
.addTableHeaderField("CommitTime")
.addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added")
.addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Partitions Written")
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written")
.addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}

@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"limit"}, help = "Limit commits",
Expand All @@ -62,26 +111,39 @@ public String showCommits(
throws IOException {

HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant commit : commits) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(), commitMetadata.fetchTotalFilesUpdated(),
commitMetadata.fetchTotalPartitionsWritten(), commitMetadata.fetchTotalRecordsWritten(),
commitMetadata.fetchTotalUpdateRecordsWritten(), commitMetadata.fetchTotalWriteErrors()});
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
return printCommits(activeTimeline, limit, sortByField, descending, headerOnly);
}

TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
.addTableHeaderField("Total Partitions Written").addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written").addTableHeaderField("Total Errors");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
@CliCommand(value = "commits show archived", help = "Show the archived commits")
public String showArchivedCommits(
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day")
String endTs,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1")
final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "")
final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false")
final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false")
final boolean headerOnly)
throws IOException {
if (StringUtils.isNullOrEmpty(startTs)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use a defaultValue during the cliOption itself so you don't have to use these checks ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CLIOption attribute can only be assigned a static value. i think default static value is not that friendly. I chose to pick '10 days before today' which is likely to be commonly used. Let me know if you prefer default constant date instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, that makes sense

startTs = getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
endTs = getTimeDaysAgo(1);
}
HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline();
try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
return printCommits(archivedTimeline.findInstantsInRange(startTs, endTs),
limit, sortByField, descending, headerOnly);
} finally {
// clear the instant details from memory after printing to reduce usage
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
}
}

@CliCommand(value = "commits refresh", help = "Refresh the commits")
Expand Down Expand Up @@ -241,4 +303,9 @@ public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table t
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}

private String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,27 @@
package org.apache.hudi.io;

import org.apache.hudi.HoodieClientTestHarness;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import com.google.common.collect.Sets;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -197,35 +192,18 @@ public void testArchiveTableWithArchival() throws IOException {
instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105")));

// read the file
Reader reader =
HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1_1-0-1")),
HoodieArchivedMetaEntry.getClassSchema());
int archivedRecordsCount = 0;
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks and validate the number of records written in each avro block
int numBlocks = 0;
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> records = blk.getRecords();
readRecords.addAll(records);
archivedRecordsCount += records.size();
numBlocks++;
}
System.out.println("Read Records :" + readRecords.stream().map(r -> (GenericRecord) r)
.map(r -> r.get("actionType") + "_" + r.get("actionState") + "_" + r.get("commitTime")).collect(Collectors.toList()));
assertEquals("Total archived records and total read records are the same count", 24, archivedRecordsCount);
assertTrue("Average Archived records per block is greater than 1", archivedRecordsCount / numBlocks > 1);
// make sure the archived commits are the same as the (originalcommits - commitsleft)
Set<String> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> {
return r.get("commitTime").toString();
}).collect(Collectors.toSet());
HoodieArchivedTimeline archivedTimeline = new HoodieArchivedTimeline(metaClient);
assertEquals("Total archived records and total read records are the same count",
24, archivedTimeline.countInstants());

//make sure the archived commits are the same as the (originalcommits - commitsleft)
Set<String> readCommits =
archivedTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
assertEquals("Read commits map should match the originalCommits - commitsLoadedFromArchival",
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits);
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits);

// verify in-flight instants after archive
verifyInflightInstants(metaClient, 2);
reader.close();
}

@Test
Expand Down Expand Up @@ -397,6 +375,37 @@ public void testArchiveCommitCompactionNoHole() throws IOException {
timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "107")));
}

@Test
public void checkArchiveCommitTimeline() throws IOException, InterruptedException {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);

HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1");
HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf());
HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2");
HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf());
HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3");

//add 2 more instants to pass filter criteria set in compaction config above
HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf());
HoodieInstant instant4 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "4");
HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5");

boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);

HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
assertEquals(new HashSet(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet()));
}

private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) {
HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class HoodieWriteStat implements Serializable {
/**
* Total number of rollback blocks seen in a compaction operation.
*/
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid this change as part of this diff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Without this we are not able to read certain archived commits

private long totalRollbackBlocks;

/**
Expand Down Expand Up @@ -290,7 +291,7 @@ public long getTotalRollbackBlocks() {
return totalRollbackBlocks;
}

public void setTotalRollbackBlocks(Long totalRollbackBlocks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, we can address this as part of some other refactoring diff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Without this we are not able to read certain archived commits

public void setTotalRollbackBlocks(long totalRollbackBlocks) {
this.totalRollbackBlocks = totalRollbackBlocks;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,14 @@ static boolean compareTimestamps(String commit1, String commit2, BiPredicate<Str
return predicateToApply.test(commit1, commit2);
}

/**
* Return true if specified timestamp is in range (startTs, endTs].
*/
static boolean isInRange(String timestamp, String startTs, String endTs) {
return HoodieTimeline.compareTimestamps(timestamp, startTs, GREATER)
&& HoodieTimeline.compareTimestamps(timestamp, endTs, LESSER_OR_EQUAL);
}

static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
return new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp());
}
Expand Down
Loading