Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand All @@ -41,10 +42,8 @@
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -186,10 +185,10 @@ public String showArchivedCommits(
final boolean headerOnly)
throws IOException {
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = getTimeDaysAgo(10);
startTs = CommitUtil.getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
endTs = getTimeDaysAgo(1);
endTs = CommitUtil.getTimeDaysAgo(1);
}
HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline();
try {
Expand Down Expand Up @@ -362,10 +361,4 @@ public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table t
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}

private String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.func.OperationResult;
Expand All @@ -61,8 +65,10 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* CLI command to display compaction related options.
Expand Down Expand Up @@ -95,51 +101,9 @@ public String compactionsAll(
throws IOException {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
HoodieTimeline commitTimeline = activeTimeline.getCommitTimeline().filterCompletedInstants();
Set<String> committed = commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());

List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant instant : instants) {
HoodieCompactionPlan compactionPlan = null;
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
compactionPlan = AvroUtils.deserializeCompactionPlan(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
} catch (HoodieIOException ioe) {
// SKIP
}
} else {
compactionPlan = AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
}

if (null != compactionPlan) {
State state = instant.getState();
if (committed.contains(instant.getTimestamp())) {
state = State.COMPLETED;
}
if (includeExtraMetadata) {
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
compactionPlan.getOperations() == null ? 0 : compactionPlan.getOperations().size(),
compactionPlan.getExtraMetadata().toString()});
} else {
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
compactionPlan.getOperations() == null ? 0 : compactionPlan.getOperations().size()});
}
}
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State")
.addTableHeaderField("Total FileIds to be Compacted");
if (includeExtraMetadata) {
header = header.addTableHeaderField("Extra Metadata");
}
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
return printAllCompactions(activeTimeline,
compactionPlanReader(this::readCompactionPlanForActiveTimeline, activeTimeline),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass timeline containing commit + compaction actions only, instead of activeTimeline (which may have other actions)?

HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nbalajee printAllCompactions only calls compcationPlanReader for commits and compactions. As part of refactor, timeline.getCommitsAndCompactionTimeline has been moved into printAllCompactions (to reuse between active/archive timelines). I just verified this works as expected even in presence of cleans.
Let me know if you think there is a better way to organize this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. LGTM.

includeExtraMetadata, sortByField, descending, limit, headerOnly);
}

@CliCommand(value = "compaction show", help = "Shows compaction details for a specific compaction instant")
Expand All @@ -159,19 +123,68 @@ public String compactionShow(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());

List<Comparable[]> rows = new ArrayList<>();
if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
}
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
}

@CliCommand(value = "compactions show archived", help = "Shows compaction details for specified time window")
public String compactionShowArchived(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for compactions, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for compactions, default: now - 1 day")
String endTs,
@CliOption(key = {"limit"}, help = "Limit compactions",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = CommitUtil.getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
endTs = CommitUtil.getTimeDaysAgo(1);
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id")
.addTableHeaderField("Base Instant").addTableHeaderField("Data File Path")
.addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
try {
return printAllCompactions(archivedTimeline,
compactionPlanReader(this::readCompactionPlanForArchivedTimeline, archivedTimeline),
includeExtraMetadata, sortByField, descending, limit, headerOnly);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
}
}

@CliCommand(value = "compaction show archived", help = "Shows compaction details for a specific compaction instant")
public String compactionShowArchived(
@CliOption(key = "instant", mandatory = true,
help = "instant time") final String compactionInstantTime,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
String startTs = CommitUtil.addHours(compactionInstantTime, -1);
String endTs = CommitUtil.addHours(compactionInstantTime, 1);
try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
archivedTimeline.getInstantDetails(instant).get());
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
}
}

@CliCommand(value = "compaction schedule", help = "Schedule Compaction")
Expand Down Expand Up @@ -249,6 +262,126 @@ public String compact(
return "Compaction successfully completed for " + compactionInstantTime;
}

/**
* Prints all compaction details.
*/
private String printAllCompactions(HoodieDefaultTimeline timeline,
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader,
boolean includeExtraMetadata,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {

Stream<HoodieInstant> instantsStream = timeline.getCommitsAndCompactionTimeline().getReverseOrderedInstants();
List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans = instantsStream
.map(instant -> Pair.of(instant, compactionPlanReader.apply(instant)))
.filter(pair -> pair.getRight() != null)
.collect(Collectors.toList());

Set<HoodieInstant> committedInstants = timeline.getCommitTimeline().filterCompletedInstants()
.getInstants().collect(Collectors.toSet());

List<Comparable[]> rows = new ArrayList<>();
for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : compactionPlans) {
HoodieCompactionPlan plan = compactionPlan.getRight();
HoodieInstant instant = compactionPlan.getLeft();
final HoodieInstant.State state;
if (committedInstants.contains(instant)) {
state = HoodieInstant.State.COMPLETED;
} else {
state = instant.getState();
}

if (includeExtraMetadata) {
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
plan.getOperations() == null ? 0 : plan.getOperations().size(),
plan.getExtraMetadata().toString()});
} else {
rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
plan.getOperations() == null ? 0 : plan.getOperations().size()});
}
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Compaction Instant Time").addTableHeaderField("State")
.addTableHeaderField("Total FileIds to be Compacted");
if (includeExtraMetadata) {
header = header.addTableHeaderField("Extra Metadata");
}
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}

/**
* Compaction reading is different for different timelines. Create partial function to override special logic.
* We can make these read methods part of HoodieDefaultTimeline and override where necessary. But the
* BiFunction below has 'hacky' exception blocks, so restricting it to CLI.
*/
private <T extends HoodieDefaultTimeline, U extends HoodieInstant, V extends HoodieCompactionPlan>
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(
BiFunction<T, HoodieInstant, HoodieCompactionPlan> f, T timeline) {

return (y) -> f.apply(timeline, y);
}

private HoodieCompactionPlan readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
HoodieInstant instant) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
return null;
} else {
try {
return AvroUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
}

/**
* TBD Can we make this part of HoodieActiveTimeline or a utility class.
*/
private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline,
HoodieInstant instant) {
try {
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
return AvroUtils.deserializeCompactionPlan(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
} catch (HoodieIOException ioe) {
// SKIP
return null;
}
} else {
return AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}

private String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {
List<Comparable[]> rows = new ArrayList<>();
if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
}
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
TableHeader header = new TableHeader().addTableHeaderField("Partition Path").addTableHeaderField("File Id")
.addTableHeaderField("Base Instant").addTableHeaderField("Data File Path")
.addTableHeaderField("Total Delta Files").addTableHeaderField("getMetrics");
return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows);
}

private static String getTmpSerializerFile() {
return TMP_DIR + UUID.randomUUID().toString() + ".ser";
}
Expand Down
23 changes: 23 additions & 0 deletions hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.io.IOException;
import java.text.ParseException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.List;

/**
Expand All @@ -42,4 +48,21 @@ public static long countNewRecords(HoodieTableMetaClient target, List<String> co
}
return totalNew;
}

public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
}

/**
* Add hours to specified time. If hours <0, this acts as remove hours.
* example, say compactionCommitTime: "20200202020000"
* a) hours: +1, returns 20200202030000
* b) hours: -1, returns 20200202010000
*/
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
Instant instant = HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
return HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant()));
}
}