diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java index 3bea0f2293d26..e6016e4cc1cb7 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java @@ -181,4 +181,36 @@ public class HoodieTableHeaderFields { public static final String HEADER_MT_REQUESTED_TIME = HEADER_MT_PREFIX + HEADER_REQUESTED_TIME; public static final String HEADER_MT_INFLIGHT_TIME = HEADER_MT_PREFIX + HEADER_INFLIGHT_TIME; public static final String HEADER_MT_COMPLETED_TIME = HEADER_MT_PREFIX + HEADER_COMPLETED_TIME; + + public static TableHeader getTableHeader() { + return new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); + } + + public static TableHeader getTableHeaderWithExtraMetadata() { + return new TableHeader() + .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN); + } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 7c74110b36a24..c1ed884315f17 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -22,14 +22,10 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; -import org.apache.hudi.cli.utils.CommitUtil; -import org.apache.hudi.cli.utils.InputStreamConsumer; -import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -38,7 +34,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; -import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; @@ -47,13 +42,15 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo; +import static org.apache.hudi.common.table.timeline.TimelineUtils.getTimeline; + /** * CLI command to display commits options. */ @@ -61,46 +58,38 @@ public class CommitsCommand implements CommandMarker { private String printCommits(HoodieDefaultTimeline timeline, - final Integer limit, final String sortByField, + final Integer limit, + final String sortByField, final boolean descending, final boolean headerOnly, final String tempTableName) throws IOException { final List rows = new ArrayList<>(); final List commits = timeline.getCommitsTimeline().filterCompletedInstants() - .getInstants().collect(Collectors.toList()); - // timeline can be read from multiple files. So sort is needed instead of reversing the collection - Collections.sort(commits, HoodieInstant.COMPARATOR.reversed()); - - for (int i = 0; i < commits.size(); i++) { - final HoodieInstant commit = commits.get(i); - final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - timeline.getInstantDetails(commit).get(), - HoodieCommitMetadata.class); - rows.add(new Comparable[] {commit.getTimestamp(), - commitMetadata.fetchTotalBytesWritten(), - commitMetadata.fetchTotalFilesInsert(), - commitMetadata.fetchTotalFilesUpdated(), - commitMetadata.fetchTotalPartitionsWritten(), - commitMetadata.fetchTotalRecordsWritten(), - commitMetadata.fetchTotalUpdateRecordsWritten(), - commitMetadata.fetchTotalWriteErrors()}); + .getInstants().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList()); + + for (final HoodieInstant commit : commits) { + if (timeline.getInstantDetails(commit).isPresent()) { + final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); + rows.add(new Comparable[] {commit.getTimestamp(), + commitMetadata.fetchTotalBytesWritten(), + commitMetadata.fetchTotalFilesInsert(), + commitMetadata.fetchTotalFilesUpdated(), + commitMetadata.fetchTotalPartitionsWritten(), + commitMetadata.fetchTotalRecordsWritten(), + commitMetadata.fetchTotalUpdateRecordsWritten(), + commitMetadata.fetchTotalWriteErrors()}); + } } final Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { - return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); - }); + fieldNameToConverterMap.put( + HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, + entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); - final TableHeader header = new TableHeader() - .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); + final TableHeader header = HoodieTableHeaderFields.getTableHeader(); return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows, tempTableName); @@ -110,80 +99,66 @@ private String printCommitsWithMetadata(HoodieDefaultTimeline timeline, final Integer limit, final String sortByField, final boolean descending, final boolean headerOnly, - final String tempTableName) throws IOException { + final String tempTableName, + final String partition) throws IOException { final List rows = new ArrayList<>(); final List commits = timeline.getCommitsTimeline().filterCompletedInstants() - .getInstants().collect(Collectors.toList()); - // timeline can be read from multiple files. So sort is needed instead of reversing the collection - Collections.sort(commits, HoodieInstant.COMPARATOR.reversed()); - - for (int i = 0; i < commits.size(); i++) { - final HoodieInstant commit = commits.get(i); - final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - timeline.getInstantDetails(commit).get(), - HoodieCommitMetadata.class); - - for (Map.Entry> partitionWriteStat : - commitMetadata.getPartitionToWriteStats().entrySet()) { - for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) { - rows.add(new Comparable[] {commit.getAction(), commit.getTimestamp(), hoodieWriteStat.getPartitionPath(), - hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(), - hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(), - hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(), - hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(), - hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(), - hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes() - }); + .getInstants().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList()); + + for (final HoodieInstant commit : commits) { + if (timeline.getInstantDetails(commit).isPresent()) { + final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); + + for (Map.Entry> partitionWriteStat : + commitMetadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) { + if (StringUtils.isNullOrEmpty(partition) || partition.equals(hoodieWriteStat.getPartitionPath())) { + rows.add(new Comparable[] {commit.getAction(), commit.getTimestamp(), hoodieWriteStat.getPartitionPath(), + hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(), + hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(), + hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(), + hoodieWriteStat.getTotalLogBlocks(), hoodieWriteStat.getTotalCorruptLogBlock(), + hoodieWriteStat.getTotalRollbackBlocks(), hoodieWriteStat.getTotalLogRecords(), + hoodieWriteStat.getTotalUpdatedRecordsCompacted(), hoodieWriteStat.getTotalWriteBytes() + }); + } + } } } } final Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { - return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); - }); + fieldNameToConverterMap.put( + HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, + entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_WRITES) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_INSERTS) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELETES) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_UPDATE_WRITES) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_BLOCKS) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_CORRUPT_LOG_BLOCKS) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ROLLBACK_BLOCKS) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_LOG_RECORDS) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATED_RECORDS_COMPACTED) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN); - - return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, - limit, headerOnly, rows, tempTableName); + return HoodiePrintHelper.print(HoodieTableHeaderFields.getTableHeaderWithExtraMetadata(), + fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows, tempTableName); } @CliCommand(value = "commits show", help = "Show the commits") public String showCommits( @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, - @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + @CliOption(key = {"createView"}, help = "view name to store output table", unspecifiedDefaultValue = "") final String exportTableName, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) - throws IOException { - - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"partition"}, help = "Partition value") final String partition, + @CliOption(key = {"includeArchivedTimeline"}, help = "Include archived commits as well", + unspecifiedDefaultValue = "false") final boolean includeArchivedTimeline) throws IOException { + HoodieDefaultTimeline timeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline); if (includeExtraMetadata) { - return printCommitsWithMetadata(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName); + return printCommitsWithMetadata(timeline, limit, sortByField, descending, headerOnly, exportTableName, partition); } else { - return printCommits(activeTimeline, limit, sortByField, descending, headerOnly, exportTableName); + return printCommits(timeline, limit, sortByField, descending, headerOnly, exportTableName); } } @@ -200,20 +175,21 @@ public String showArchivedCommits( @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, - @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) + @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"partition"}, help = "Partition value") final String partition) throws IOException { if (StringUtils.isNullOrEmpty(startTs)) { - startTs = CommitUtil.getTimeDaysAgo(10); + startTs = getTimeDaysAgo(10); } if (StringUtils.isNullOrEmpty(endTs)) { - endTs = CommitUtil.getTimeDaysAgo(1); + endTs = getTimeDaysAgo(1); } HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline(); try { archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs); if (includeExtraMetadata) { - return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName); + return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName, partition); } else { return printCommits(timelineRange, limit, sortByField, descending, headerOnly, exportTableName); } @@ -223,51 +199,20 @@ public String showArchivedCommits( } } - @CliCommand(value = "commit rollback", help = "Rollback a commit") - public String rollbackCommit( - @CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime, - @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, - @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, - @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", - help = "Spark executor memory") final String sparkMemory, - @CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "false", - help = "Enabling marker based rollback") final String rollbackUsingMarkers) - throws Exception { - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - HoodieTimeline filteredTimeline = completedTimeline.filter(instant -> instant.getTimestamp().equals(instantTime)); - if (filteredTimeline.empty()) { - return "Commit " + instantTime + " not found in Commits " + completedTimeline; - } - - SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); - sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime, - HoodieCLI.getTableMetaClient().getBasePath(), rollbackUsingMarkers); - Process process = sparkLauncher.launch(); - InputStreamConsumer.captureOutput(process); - int exitCode = process.waitFor(); - // Refresh the current - HoodieCLI.refreshTableMetadata(); - if (exitCode != 0) { - return "Commit " + instantTime + " failed to roll back"; - } - return "Commit " + instantTime + " rolled back"; - } - @CliCommand(value = "commit showpartitions", help = "Show partition level details of a commit") public String showCommitPartitions( - @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + @CliOption(key = {"createView"}, help = "view name to store output table", unspecifiedDefaultValue = "") final String exportTableName, @CliOption(key = {"commit"}, help = "Commit to show") final String instantTime, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) - throws Exception { - - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"includeArchivedTimeline"}, help = "Include archived commits as well", + unspecifiedDefaultValue = "false") final boolean includeArchivedTimeline) throws Exception { + HoodieDefaultTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline); + HoodieTimeline timeline = defaultTimeline.getCommitsTimeline().filterCompletedInstants(); Option hoodieInstantOption = getCommitForInstant(timeline, instantTime); Option commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption); @@ -322,18 +267,18 @@ public String showCommitPartitions( @CliCommand(value = "commit show_write_stats", help = "Show write stats of a commit") public String showWriteStats( - @CliOption(key = {"createView"}, mandatory = false, help = "view name to store output table", + @CliOption(key = {"createView"}, help = "view name to store output table", unspecifiedDefaultValue = "") final String exportTableName, @CliOption(key = {"commit"}, help = "Commit to show") final String instantTime, @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) - throws Exception { - - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"includeArchivedTimeline"}, help = "Include archived commits as well", + unspecifiedDefaultValue = "false") final boolean includeArchivedTimeline) throws Exception { + HoodieDefaultTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline); + HoodieTimeline timeline = defaultTimeline.getCommitsTimeline().filterCompletedInstants(); Option hoodieInstantOption = getCommitForInstant(timeline, instantTime); Option commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption); @@ -373,11 +318,11 @@ public String showCommitFiles( @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) - throws Exception { - - HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"includeArchivedTimeline"}, help = "Include archived commits as well", + unspecifiedDefaultValue = "false") final boolean includeArchivedTimeline) throws Exception { + HoodieDefaultTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline); + HoodieTimeline timeline = defaultTimeline.getCommitsTimeline().filterCompletedInstants(); Option hoodieInstantOption = getCommitForInstant(timeline, instantTime); Option commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption); @@ -450,15 +395,13 @@ public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table t /* Checks whether a commit or replacecommit action exists in the timeline. * */ - private Option getCommitForInstant(HoodieTimeline timeline, String instantTime) throws IOException { + private Option getCommitForInstant(HoodieTimeline timeline, String instantTime) { List instants = Arrays.asList( new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime), new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime), new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)); - Option hoodieInstant = Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny()); - - return hoodieInstant; + return Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny()); } private Option getHoodieCommitMetadata(HoodieTimeline timeline, Option hoodieInstant) throws IOException { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index c979e1840f28b..136546ddaef42 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -25,7 +25,6 @@ import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.commands.SparkMain.SparkCommand; -import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.CompactionAdminClient.RenameOpResult; @@ -71,6 +70,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo; + /** * CLI command to display compaction related options. */ @@ -115,7 +116,8 @@ public String compactionShow( @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"partition"}, help = "Partition value") final String partition) throws Exception { HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieActiveTimeline activeTimeline = client.getActiveTimeline(); @@ -123,16 +125,16 @@ public String compactionShow( activeTimeline.readCompactionPlanAsBytes( HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get()); - return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly); + return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition); } @CliCommand(value = "compactions showarchived", help = "Shows compaction details for specified time window") public String compactionsShowArchived( @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, - @CliOption(key = {"startTs"}, mandatory = false, help = "start time for compactions, default: now - 10 days") + @CliOption(key = {"startTs"}, help = "start time for compactions, default: now - 10 days") String startTs, - @CliOption(key = {"endTs"}, mandatory = false, help = "end time for compactions, default: now - 1 day") + @CliOption(key = {"endTs"}, help = "end time for compactions, default: now - 1 day") String endTs, @CliOption(key = {"limit"}, help = "Limit compactions", unspecifiedDefaultValue = "-1") final Integer limit, @@ -141,10 +143,10 @@ public String compactionsShowArchived( @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly) { if (StringUtils.isNullOrEmpty(startTs)) { - startTs = CommitUtil.getTimeDaysAgo(10); + startTs = getTimeDaysAgo(10); } if (StringUtils.isNullOrEmpty(endTs)) { - endTs = CommitUtil.getTimeDaysAgo(1); + endTs = getTimeDaysAgo(1); } HoodieTableMetaClient client = checkAndGetMetaClient(); @@ -168,7 +170,8 @@ public String compactionShowArchived( @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @CliOption(key = {"headeronly"}, help = "Print Header Only", - unspecifiedDefaultValue = "false") final boolean headerOnly) + unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"partition"}, help = "Partition value") final String partition) throws Exception { HoodieTableMetaClient client = checkAndGetMetaClient(); HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline(); @@ -178,7 +181,7 @@ public String compactionShowArchived( archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime); HoodieCompactionPlan compactionPlan = TimelineMetadataUtils.deserializeAvroRecordMetadata( archivedTimeline.getInstantDetails(instant).get(), HoodieCompactionPlan.getClassSchema()); - return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly); + return printCompaction(compactionPlan, sortByField, descending, limit, headerOnly, partition); } finally { archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime); } @@ -303,13 +306,13 @@ public String compact( /** * Prints all compaction details. */ - private String printAllCompactions(HoodieDefaultTimeline timeline, - Function compactionPlanReader, - boolean includeExtraMetadata, - String sortByField, - boolean descending, - int limit, - boolean headerOnly) { + private static String printAllCompactions(HoodieDefaultTimeline timeline, + Function compactionPlanReader, + boolean includeExtraMetadata, + String sortByField, + boolean descending, + int limit, + boolean headerOnly) { Stream instantsStream = timeline.getWriteTimeline().getReverseOrderedInstants(); List> compactionPlans = instantsStream @@ -405,16 +408,19 @@ private HoodieCompactionPlan readCompactionPlanForActiveTimeline(HoodieActiveTim } } - protected String printCompaction(HoodieCompactionPlan compactionPlan, - String sortByField, - boolean descending, - int limit, - boolean headerOnly) { + protected static String printCompaction(HoodieCompactionPlan compactionPlan, + String sortByField, + boolean descending, + int limit, + boolean headerOnly, + final String partition) { List rows = new ArrayList<>(); if ((null != compactionPlan) && (null != compactionPlan.getOperations())) { for (HoodieCompactionOperation op : compactionPlan.getOperations()) { - rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), - op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()}); + if (StringUtils.isNullOrEmpty(partition) || partition.equals(op.getPartitionPath())) { + rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), op.getBaseInstantTime(), op.getDataFilePath(), + op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : op.getMetrics().toString()}); + } } } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DiffCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DiffCommand.java new file mode 100644 index 0000000000000..29b5c6e51c3dc --- /dev/null +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DiffCommand.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.NumericUtils; +import org.apache.hudi.common.util.Option; + +import org.springframework.shell.core.CommandMarker; +import org.springframework.shell.core.annotation.CliCommand; +import org.springframework.shell.core.annotation.CliOption; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.common.util.StringUtils.nonEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + +/** + * Given a file id or partition value, this command line utility tracks the changes to the file group or partition across range of commits. + * Usage: diff file --fileId + */ +@Component +public class DiffCommand implements CommandMarker { + + private static final BiFunction FILE_ID_CHECKER = (writeStat, fileId) -> fileId.equals(writeStat.getFileId()); + private static final BiFunction PARTITION_CHECKER = (writeStat, partitionPath) -> partitionPath.equals(writeStat.getPartitionPath()); + + @CliCommand(value = "diff file", help = "Check how file differs across range of commits") + public String diffFile( + @CliOption(key = {"fileId"}, help = "File ID to diff across range of commits", mandatory = true) String fileId, + @CliOption(key = {"startTs"}, help = "start time for compactions, default: now - 10 days") String startTs, + @CliOption(key = {"endTs"}, help = "end time for compactions, default: now - 1 day") String endTs, + @CliOption(key = {"limit"}, help = "Limit compactions", unspecifiedDefaultValue = "-1") final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"includeArchivedTimeline"}, help = "Include archived commits as well", + unspecifiedDefaultValue = "false") final boolean includeArchivedTimeline) throws IOException { + HoodieDefaultTimeline timeline = getTimelineInRange(startTs, endTs, includeArchivedTimeline); + return printCommitsWithMetadataForFileId(timeline, limit, sortByField, descending, headerOnly, "", fileId); + } + + @CliCommand(value = "diff partition", help = "Check how file differs across range of commits. It is meant to be used only for partitioned tables.") + public String diffPartition( + @CliOption(key = {"partitionPath"}, help = "Relative partition path to diff across range of commits", mandatory = true) String partitionPath, + @CliOption(key = {"startTs"}, help = "start time for compactions, default: now - 10 days") String startTs, + @CliOption(key = {"endTs"}, help = "end time for compactions, default: now - 1 day") String endTs, + @CliOption(key = {"limit"}, help = "Limit compactions", unspecifiedDefaultValue = "-1") final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") final boolean headerOnly, + @CliOption(key = {"includeArchivedTimeline"}, help = "Include archived commits as well", + unspecifiedDefaultValue = "false") final boolean includeArchivedTimeline) throws IOException { + HoodieDefaultTimeline timeline = getTimelineInRange(startTs, endTs, includeArchivedTimeline); + return printCommitsWithMetadataForPartition(timeline, limit, sortByField, descending, headerOnly, "", partitionPath); + } + + private HoodieDefaultTimeline getTimelineInRange(String startTs, String endTs, boolean includeArchivedTimeline) { + if (isNullOrEmpty(startTs)) { + startTs = getTimeDaysAgo(10); + } + if (isNullOrEmpty(endTs)) { + endTs = getTimeDaysAgo(1); + } + checkArgument(nonEmpty(startTs), "startTs is null or empty"); + checkArgument(nonEmpty(endTs), "endTs is null or empty"); + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + if (includeArchivedTimeline) { + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); + return archivedTimeline.findInstantsInRange(startTs, endTs).mergeTimeline(activeTimeline); + } + return activeTimeline; + } + + private String printCommitsWithMetadataForFileId(HoodieDefaultTimeline timeline, + final Integer limit, + final String sortByField, + final boolean descending, + final boolean headerOnly, + final String tempTableName, + final String fileId) throws IOException { + return printDiffWithMetadata(timeline, limit, sortByField, descending, headerOnly, tempTableName, fileId, FILE_ID_CHECKER); + } + + private String printCommitsWithMetadataForPartition(HoodieDefaultTimeline timeline, + final Integer limit, + final String sortByField, + final boolean descending, + final boolean headerOnly, + final String tempTableName, + final String partition) throws IOException { + return printDiffWithMetadata(timeline, limit, sortByField, descending, headerOnly, tempTableName, partition, PARTITION_CHECKER); + } + + private String printDiffWithMetadata(HoodieDefaultTimeline timeline, Integer limit, String sortByField, boolean descending, boolean headerOnly, String tempTableName, String diffEntity, + BiFunction diffEntityChecker) throws IOException { + List rows = new ArrayList<>(); + List commits = timeline.getCommitsTimeline().filterCompletedInstants() + .getInstants().sorted(HoodieInstant.COMPARATOR.reversed()).collect(Collectors.toList()); + + for (final HoodieInstant commit : commits) { + Option instantDetails = timeline.getInstantDetails(commit); + if (instantDetails.isPresent()) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(instantDetails.get(), HoodieCommitMetadata.class); + for (Map.Entry> partitionWriteStat : + commitMetadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) { + populateRows(rows, commit, hoodieWriteStat, diffEntity, diffEntityChecker); + } + } + } + } + + Map> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put( + HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, + entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); + + return HoodiePrintHelper.print(HoodieTableHeaderFields.getTableHeaderWithExtraMetadata(), + fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows, tempTableName); + } + + private void populateRows(List rows, HoodieInstant commit, HoodieWriteStat hoodieWriteStat, + String value, BiFunction checker) { + if (checker.apply(hoodieWriteStat, value)) { + rows.add(new Comparable[] { + commit.getAction(), + commit.getTimestamp(), + hoodieWriteStat.getPartitionPath(), + hoodieWriteStat.getFileId(), + hoodieWriteStat.getPrevCommit(), + hoodieWriteStat.getNumWrites(), + hoodieWriteStat.getNumInserts(), + hoodieWriteStat.getNumDeletes(), + hoodieWriteStat.getNumUpdateWrites(), + hoodieWriteStat.getTotalWriteErrors(), + hoodieWriteStat.getTotalLogBlocks(), + hoodieWriteStat.getTotalCorruptLogBlock(), + hoodieWriteStat.getTotalRollbackBlocks(), + hoodieWriteStat.getTotalLogRecords(), + hoodieWriteStat.getTotalUpdatedRecordsCompacted(), + hoodieWriteStat.getTotalWriteBytes() + }); + } + } +} diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncValidateCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncValidateCommand.java index 30b42552bb16c..35e9b2b016b67 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncValidateCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncValidateCommand.java @@ -19,7 +19,6 @@ package org.apache.hudi.cli.commands; import org.apache.hudi.cli.HoodieCLI; -import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.HiveUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -35,6 +34,8 @@ import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.cli.utils.CommitUtil.countNewRecords; + /** * CLI command to display sync options. */ @@ -96,7 +97,7 @@ private String getString(HoodieTableMetaClient target, HoodieTimeline targetTime return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count(" + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount); } else { - long newInserts = CommitUtil.countNewRecords(target, + long newInserts = countNewRecords(target, commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList())); return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count(" + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is " diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java index 8d1d21ac903ea..3040e0f6a1c12 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java @@ -23,6 +23,8 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.utils.InputStreamConsumer; +import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -32,6 +34,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; import org.springframework.shell.core.annotation.CliOption; @@ -122,6 +125,37 @@ public String showRollback( return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); } + @CliCommand(value = "commit rollback", help = "Rollback a commit") + public String rollbackCommit( + @CliOption(key = {"commit"}, help = "Commit to rollback") final String instantTime, + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath, + @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master, + @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", + help = "Spark executor memory") final String sparkMemory, + @CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "false", + help = "Enabling marker based rollback") final String rollbackUsingMarkers) + throws Exception { + HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); + HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline filteredTimeline = completedTimeline.filter(instant -> instant.getTimestamp().equals(instantTime)); + if (filteredTimeline.empty()) { + return "Commit " + instantTime + " not found in Commits " + completedTimeline; + } + + SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); + sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime, + HoodieCLI.getTableMetaClient().getBasePath(), rollbackUsingMarkers); + Process process = sparkLauncher.launch(); + InputStreamConsumer.captureOutput(process); + int exitCode = process.waitFor(); + // Refresh the current + HoodieCLI.refreshTableMetadata(); + if (exitCode != 0) { + return "Commit " + instantTime + " failed to roll back"; + } + return "Commit " + instantTime + " rolled back"; + } + /** * An Active timeline containing only rollbacks. */ diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 5d58aa9d2e498..21910fd956dfe 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -34,9 +34,9 @@ */ public class CommitUtil { - public static long countNewRecords(HoodieTableMetaClient target, List commitsToCatchup) throws IOException { + public static long countNewRecords(HoodieTableMetaClient metaClient, List commitsToCatchup) throws IOException { long totalNew = 0; - HoodieTimeline timeline = target.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); for (String commit : commitsToCatchup) { HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes( timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(), diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index d24c62df3da1c..0a06749523e8f 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -170,21 +170,9 @@ private String generateExpectData(int records, Map data) thro }); final Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> { - return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); - }); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); - final TableHeader header = new TableHeader() - .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMMIT_TIME) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_PARTITIONS_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_UPDATE_RECORDS_WRITTEN) - .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); - - return HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, + return HoodiePrintHelper.print(HoodieTableHeaderFields.getTableHeader(), fieldNameToConverterMap, "", false, -1, false, rows); } @@ -204,12 +192,67 @@ public void testShowCommits() throws Exception { assertEquals(expected, got); } + @Test + public void testShowCommitsIncludingArchivedTimeline() throws Exception { + Map data = generateDataAndArchive(true); + data.remove("101"); + data.remove("102"); + + CommandResult cr = shell().executeCommand("commits show --includeExtraMetadata true --includeArchivedTimeline true --partition 2015/03/16"); + assertTrue(cr.isSuccess()); + + String expected = generateExpectDataWithExtraMetadata(1, data); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + private String generateExpectDataWithExtraMetadata(int records, Map data) throws IOException { + List rows = new ArrayList<>(); + data.forEach((key, value) -> { + for (int i = 0; i < records; i++) { + // there are more than 1 partitions, so need to * partitions + rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, key, "2015/03/16", HoodieTestCommitMetadataGenerator.DEFAULT_FILEID, + HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT, key.equals("104") ? "20" : "15", "0", "0", key.equals("104") ? "10" : "15", + "0", HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_BLOCKS, "0", "0", HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_RECORDS, + "0", HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES}); + } + }); + + final Map> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); + + final TableHeader header = HoodieTableHeaderFields.getTableHeaderWithExtraMetadata(); + + return HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, + -1, false, rows); + } + /** * Test case of 'commits showarchived' command. */ @ParameterizedTest @ValueSource(booleans = {true, false}) public void testShowArchivedCommits(boolean enableMetadataTable) throws Exception { + Map data = generateDataAndArchive(enableMetadataTable); + + CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); + assertTrue(cr.isSuccess()); + + // archived 101 and 102 instant, generate expect data + assertEquals(2, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(), + "There should 2 instants not be archived!"); + + // archived 101 and 102 instants, remove 103 and 104 instant + data.remove("103"); + data.remove("104"); + String expected = generateExpectData(1, data); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + private Map generateDataAndArchive(boolean enableMetadataTable) throws Exception { // Generate archive HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1) .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) @@ -245,21 +288,7 @@ public void testShowArchivedCommits(boolean enableMetadataTable) throws Exceptio HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient); HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table); archiver.archiveIfRequired(context()); - - CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); - assertTrue(cr.isSuccess()); - - // archived 101 and 102 instant, generate expect data - assertEquals(2, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(), - "There should 2 instants not be archived!"); - - // archived 101 and 102 instants, remove 103 and 104 instant - data.remove("103"); - data.remove("104"); - String expected = generateExpectData(1, data); - expected = removeNonWordAndStripSpace(expected); - String got = removeNonWordAndStripSpace(cr.getResult().toString()); - assertEquals(expected, got); + return data; } @ParameterizedTest diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java index e61a7ade7c5b9..e909e5c9ea28b 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java @@ -225,7 +225,7 @@ public void testCompactionShowArchived() throws IOException { CommandResult cr = shell().executeCommand("compaction showarchived --instant " + instance); // generate expected - String expected = new CompactionCommand().printCompaction(plan, "", false, -1, false); + String expected = CompactionCommand.printCompaction(plan, "", false, -1, false, null); expected = removeNonWordAndStripSpace(expected); String got = removeNonWordAndStripSpace(cr.getResult().toString()); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java new file mode 100644 index 0000000000000..ed5e873bc0c98 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.cli.commands; + +import org.apache.hudi.cli.HoodieCLI; +import org.apache.hudi.cli.HoodiePrintHelper; +import org.apache.hudi.cli.HoodieTableHeaderFields; +import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; +import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.NumericUtils; +import org.apache.hudi.common.util.Option; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.springframework.shell.core.CommandResult; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test Cases for {@link DiffCommand}. + */ +@Tag("functional") +public class TestDiffCommand extends CLIFunctionalTestHarness { + + private String tableName; + private String tablePath; + + @BeforeEach + public void init() { + tableName = tableName(); + tablePath = tablePath(tableName); + } + + @Test + public void testDiffFile() throws Exception { + // create COW table. + new TableCommand().createTable( + tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), + "", TimelineLayoutVersion.VERSION_1, HoodieAvroPayload.class.getName()); + + Configuration conf = HoodieCLI.conf; + + HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + FileSystem fs = FSUtils.getFs(basePath(), hadoopConf()); + HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, tablePath); + + // Create four commits + Set commits = new HashSet<>(); + for (int i = 100; i < 104; i++) { + String timestamp = String.valueOf(i); + commits.add(timestamp); + // Requested Compaction + HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, + new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); + // Inflight Compaction + HoodieTestCommitMetadataGenerator.createCompactionAuxiliaryMetadata(tablePath, + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); + + Map extraCommitMetadata = + Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2, + Option.empty(), Option.empty(), extraCommitMetadata, false); + } + + HoodieTableMetaClient.reload(metaClient); + + CommandResult cr = shell().executeCommand(String.format("diff file --fileId %s", fileId1)); + assertTrue(cr.isSuccess()); + String expected = generateExpectDataWithExtraMetadata(commits, fileId1, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + + private String generateExpectDataWithExtraMetadata(Set commits, String fileId, String partition) { + List rows = new ArrayList<>(); + commits.stream().sorted(Comparator.reverseOrder()).forEach(commit -> rows.add(new Comparable[] { + HoodieTimeline.COMMIT_ACTION, + commit, + partition, + fileId, + HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT, + HoodieTestCommitMetadataGenerator.DEFAULT_NUM_WRITES, + "0", + "0", + HoodieTestCommitMetadataGenerator.DEFAULT_NUM_UPDATE_WRITES, + "0", + HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_BLOCKS, + "0", + "0", + HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_LOG_RECORDS, + "0", + HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES})); + + final Map> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); + + final TableHeader header = HoodieTableHeaderFields.getTableHeaderWithExtraMetadata(); + + return HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows); + } +} diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java index f59dca4e1ea9f..67592be1adcf3 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java @@ -89,7 +89,23 @@ public static void createCommitFileWithMetadata(String basePath, String commitTi HoodieTimeline.makeRequestedCommitFileName(commitTime)); for (String name : commitFileNames) { HoodieCommitMetadata commitMetadata = - generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, extraMetadata); + generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, extraMetadata, true); + String content = commitMetadata.toJsonString(); + createFileWithMetadata(basePath, configuration, name, content); + } + } + + public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration, + String fileId1, String fileId2, Option writes, + Option updates, Map extraMetadata, + boolean setDefaultFileId) throws Exception { + List commitFileNames = Arrays.asList( + HoodieTimeline.makeCommitFileName(commitTime), + HoodieTimeline.makeInflightCommitFileName(commitTime), + HoodieTimeline.makeRequestedCommitFileName(commitTime)); + for (String name : commitFileNames) { + HoodieCommitMetadata commitMetadata = + generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, extraMetadata, setDefaultFileId); String content = commitMetadata.toJsonString(); createFileWithMetadata(basePath, configuration, name, content); } @@ -117,11 +133,13 @@ public static HoodieCommitMetadata generateCommitMetadata(String basePath, Strin public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1, String fileId2, Option writes, Option updates) throws Exception { - return generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, Collections.emptyMap()); + return generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates, Collections.emptyMap(), true); } public static HoodieCommitMetadata generateCommitMetadata(String basePath, String commitTime, String fileId1, - String fileId2, Option writes, Option updates, Map extraMetadata) throws Exception { + String fileId2, Option writes, + Option updates, Map extraMetadata, + boolean setDefaultFileId) throws Exception { FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1); FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2); return generateCommitMetadata(new HashMap>() { @@ -129,19 +147,19 @@ public static HoodieCommitMetadata generateCommitMetadata(String basePath, Strin put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1))); put(DEFAULT_SECOND_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2))); } - }, writes, updates, extraMetadata); + }, writes, updates, extraMetadata, setDefaultFileId); } private static HoodieCommitMetadata generateCommitMetadata(Map> partitionToFilePaths, Option writes, Option updates) { - return generateCommitMetadata(partitionToFilePaths, writes, updates, Collections.emptyMap()); + return generateCommitMetadata(partitionToFilePaths, writes, updates, Collections.emptyMap(), true); } /** * Method to generate commit metadata. */ private static HoodieCommitMetadata generateCommitMetadata(Map> partitionToFilePaths, - Option writes, Option updates, Map extraMetadata) { + Option writes, Option updates, Map extraMetadata, boolean setDefaultFileId) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); for (Map.Entry entry: extraMetadata.entrySet()) { metadata.addMetadata(entry.getKey(), entry.getValue()); @@ -150,7 +168,7 @@ private static HoodieCommitMetadata generateCommitMetadata(Map validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); return new HoodieDefaultTimeline(getInstants().filter(i -> - readCommits.containsKey(i.getTimestamp())) + readCommits.containsKey(i.getTimestamp())) .filter(s -> validActions.contains(s.getAction())), details); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index e7970baf673e8..7324421894c0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -399,4 +399,19 @@ public boolean isEmpty(HoodieInstant instant) { public String toString() { return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(",")); } + + /** + * Merge this timeline with the given timeline. + */ + public HoodieDefaultTimeline mergeTimeline(HoodieDefaultTimeline timeline) { + Stream instantStream = Stream.concat(instants.stream(), timeline.getInstants()).sorted(); + Function> details = instant -> { + if (instants.stream().anyMatch(i -> i.equals(instant))) { + return this.getInstantDetails(instant); + } else { + return timeline.getInstantDetails(instant); + } + }; + return new HoodieDefaultTimeline(instantStream, details); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index de8c5821c1118..9f5f4c23d0761 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -131,7 +131,6 @@ public static Option getExtraMetadataFromLatest(HoodieTableMetaClient me getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); } - /** * Get extra metadata for specified key from latest commit/deltacommit/replacecommit instant including internal commits * such as clustering. @@ -177,4 +176,13 @@ private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, Hood throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e); } } + + public static HoodieDefaultTimeline getTimeline(HoodieTableMetaClient metaClient, boolean includeArchivedTimeline) { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + if (includeArchivedTimeline) { + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + return archivedTimeline.mergeTimeline(activeTimeline); + } + return activeTimeline; + } }