Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,36 @@ public class HoodieTableHeaderFields {
public static final String HEADER_MT_REQUESTED_TIME = HEADER_MT_PREFIX + HEADER_REQUESTED_TIME;
public static final String HEADER_MT_INFLIGHT_TIME = HEADER_MT_PREFIX + HEADER_INFLIGHT_TIME;
public static final String HEADER_MT_COMPLETED_TIME = HEADER_MT_PREFIX + HEADER_COMPLETED_TIME;

public static TableHeader getTableHeader() {
return new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
}

public static TableHeader getTableHeaderWithExtraMetadata() {
return new TableHeader()
.addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED)
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN);
}
}
229 changes: 86 additions & 143 deletions hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.CompactionAdminClient.RenameOpResult;
Expand Down Expand Up @@ -71,6 +70,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo;

/**
* CLI command to display compaction related options.
*/
Expand Down Expand Up @@ -115,24 +116,25 @@ public String compactionShow(
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
unspecifiedDefaultValue = "false") final boolean headerOnly,
@CliOption(key = {"partition"}, help = "Partition value") final String partition)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeCompactionPlan(
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());

return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
}

@CliCommand(value = "compactions showarchived", help = "Shows compaction details for specified time window")
public String compactionsShowArchived(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time for compactions, default: now - 10 days")
@CliOption(key = {"startTs"}, help = "start time for compactions, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for compactions, default: now - 1 day")
@CliOption(key = {"endTs"}, help = "end time for compactions, default: now - 1 day")
String endTs,
@CliOption(key = {"limit"}, help = "Limit compactions",
unspecifiedDefaultValue = "-1") final Integer limit,
Expand All @@ -141,10 +143,10 @@ public String compactionsShowArchived(
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly) {
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = CommitUtil.getTimeDaysAgo(10);
startTs = getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
endTs = CommitUtil.getTimeDaysAgo(1);
endTs = getTimeDaysAgo(1);
}

HoodieTableMetaClient client = checkAndGetMetaClient();
Expand All @@ -168,7 +170,8 @@ public String compactionShowArchived(
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
unspecifiedDefaultValue = "false") final boolean headerOnly)
unspecifiedDefaultValue = "false") final boolean headerOnly,
@CliOption(key = {"partition"}, help = "Partition value") final String partition)
throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
Expand All @@ -178,7 +181,7 @@ public String compactionShowArchived(
archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata(
archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema());
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly);
return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition);
} finally {
archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
}
Expand Down Expand Up @@ -303,13 +306,13 @@ public String compact(
/**
* Prints all compaction details.
*/
private String printAllCompactions(HoodieDefaultTimeline timeline,
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader,
boolean includeExtraMetadata,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {
private static String printAllCompactions(HoodieDefaultTimeline timeline,
Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader,
boolean includeExtraMetadata,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {

Stream<HoodieInstant> instantsStream = timeline.getWriteTimeline().getReverseOrderedInstants();
List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans = instantsStream
Expand Down Expand Up @@ -405,16 +408,19 @@ private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTim
}
}

protected String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField,
boolean descending,
int limit,
boolean headerOnly) {
protected static String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField,
boolean descending,
int limit,
boolean headerOnly,
final String partition) {
List<Comparable[]> rows = new ArrayList<>();
if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
if (StringUtils.isNullOrEmpty(partition) || partition.equals(op.getPartitionPath())) {
rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(),
op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()});
}
}
}

Expand Down
Loading