Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private static String print(Table buffer) {
buffer.getFieldNames().toArray(header);

String[][] rows =
buffer.getRenderRows().stream().map(l -> l.stream().toArray(String[]::new)).toArray(String[][]::new);
buffer.getRenderRows().stream().map(l -> l.toArray(new String[l.size()])).toArray(String[][]::new);
return printTextTable(header, rows);
}

Expand Down
36 changes: 14 additions & 22 deletions hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -89,7 +88,7 @@ public Table add(List<Comparable> row) {
* @return
*/
public Table addAll(List<List<Comparable>> rows) {
rows.forEach(r -> add(r));
rows.forEach(this::add);
return this;
}

Expand Down Expand Up @@ -120,16 +119,11 @@ public Table flip() {
*/
private List<List<Comparable>> orderRows() {
return orderingFieldNameOptional.map(orderingColumnName -> {
return rawRows.stream().sorted(new Comparator<List<Comparable>>() {
@Override
public int compare(List<Comparable> row1, List<Comparable> row2) {
Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
return isDescendingOptional.map(isDescending -> {
return isDescending ? -1 * cmpRawResult : cmpRawResult;
}).orElse(cmpRawResult);
}
return rawRows.stream().sorted((row1, row2) -> {
Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName));
Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName));
int cmpRawResult = fieldForRow1.compareTo(fieldForRow2);
return isDescendingOptional.map(isDescending -> isDescending ? -1 * cmpRawResult : cmpRawResult).orElse(cmpRawResult);
}).collect(Collectors.toList());
}).orElse(rawRows);
}
Expand All @@ -141,16 +135,14 @@ private void sortAndLimit() {
this.renderRows = new ArrayList<>();
final int limit = this.limitOptional.orElse(rawRows.size());
final List<List<Comparable>> orderedRows = orderRows();
renderRows = orderedRows.stream().limit(limit).map(row -> {
return IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
String fieldName = rowHeader.get(idx);
if (fieldNameToConverterMap.containsKey(fieldName)) {
return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
}
Object v = row.get(idx);
return v == null ? "null" : v.toString();
}).collect(Collectors.toList());
}).collect(Collectors.toList());
renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
String fieldName = rowHeader.get(idx);
if (fieldNameToConverterMap.containsKey(fieldName)) {
return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
}
Object v = row.get(idx);
return v == null ? "null" : v.toString();
}).collect(Collectors.toList())).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,27 @@ public String showArchivedCommits(
.deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata"));
final String instantTime = r.get("commitTime").toString();
final String action = r.get("actionType").toString();
return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> {
return hoodieWriteStats.stream().map(hoodieWriteStat -> {
List<Comparable> row = new ArrayList<>();
row.add(action);
row.add(instantTime);
row.add(hoodieWriteStat.getPartitionPath());
row.add(hoodieWriteStat.getFileId());
row.add(hoodieWriteStat.getPrevCommit());
row.add(hoodieWriteStat.getNumWrites());
row.add(hoodieWriteStat.getNumInserts());
row.add(hoodieWriteStat.getNumDeletes());
row.add(hoodieWriteStat.getNumUpdateWrites());
row.add(hoodieWriteStat.getTotalLogFiles());
row.add(hoodieWriteStat.getTotalLogBlocks());
row.add(hoodieWriteStat.getTotalCorruptLogBlock());
row.add(hoodieWriteStat.getTotalRollbackBlocks());
row.add(hoodieWriteStat.getTotalLogRecords());
row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
row.add(hoodieWriteStat.getTotalWriteBytes());
row.add(hoodieWriteStat.getTotalWriteErrors());
return row;
});
}).map(rowList -> rowList.toArray(new Comparable[0]));
return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> {
List<Comparable> row = new ArrayList<>();
row.add(action);
row.add(instantTime);
row.add(hoodieWriteStat.getPartitionPath());
row.add(hoodieWriteStat.getFileId());
row.add(hoodieWriteStat.getPrevCommit());
row.add(hoodieWriteStat.getNumWrites());
row.add(hoodieWriteStat.getNumInserts());
row.add(hoodieWriteStat.getNumDeletes());
row.add(hoodieWriteStat.getNumUpdateWrites());
row.add(hoodieWriteStat.getTotalLogFiles());
row.add(hoodieWriteStat.getTotalLogBlocks());
row.add(hoodieWriteStat.getTotalCorruptLogBlock());
row.add(hoodieWriteStat.getTotalRollbackBlocks());
row.add(hoodieWriteStat.getTotalLogRecords());
row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted());
row.add(hoodieWriteStat.getTotalWriteBytes());
row.add(hoodieWriteStat.getTotalWriteErrors());
return row;
})).map(rowList -> rowList.toArray(new Comparable[0]));
}).collect(Collectors.toList());
allStats.addAll(readCommits);
reader.close();
Expand Down Expand Up @@ -183,14 +181,7 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) {
}
break;
}
case HoodieTimeline.COMMIT_ACTION: {
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
if (!skipMetadata) {
commitDetails.add(record.get("hoodieCommitMetadata").toString());
}
break;
}
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION: {
commitDetails.add(record.get("commitTime"));
commitDetails.add(record.get("actionType").toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,11 @@ public String showCleans(
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (int i = 0; i < cleans.size(); i++) {
HoodieInstant clean = cleans.get(i);
for (HoodieInstant clean : cleans) {
HoodieCleanMetadata cleanMetadata =
AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows.add(new Comparable[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
}

TableHeader header =
Expand Down Expand Up @@ -110,8 +109,8 @@ public String showCleanPartitions(@CliOption(key = {"clean"}, help = "clean to s
String path = entry.getKey();
HoodieCleanPartitionMetadata stats = entry.getValue();
String policy = stats.getPolicy();
Integer totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
Integer totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
int totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size();
int totalFailedDeletedFiles = stats.getFailedDeleteFiles().size();
rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CommitsCommand implements CommandMarker {

@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
Expand All @@ -65,8 +65,7 @@ public String showCommits(
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (int i = 0; i < commits.size(); i++) {
HoodieInstant commit = commits.get(i);
for (HoodieInstant commit : commits) {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(),
Expand All @@ -76,9 +75,7 @@ public String showCommits(
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
});
fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));

TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
Expand All @@ -95,7 +92,7 @@ public String refreshCommits() throws IOException {

@CliCommand(value = "commit rollback", help = "Rollback a commit")
public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime,
@CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath)
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
Expand Down Expand Up @@ -163,9 +160,7 @@ public String showCommitPartitions(@CliOption(key = {"commit"}, help = "Commit t
}

Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>();
fieldNameToConverterMap.put("Total Bytes Written", entry -> {
return NumericUtils.humanReadableByteCount((Long.valueOf(entry.toString())));
});
fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));

TableHeader header = new TableHeader().addTableHeaderField("Partition Path")
.addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated")
Expand Down Expand Up @@ -240,8 +235,7 @@ public String compareCommits(@CliOption(key = {"path"}, help = "Path of the data
}

@CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset")
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path)
throws Exception {
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) {
HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path);
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.func.OperationResult;
import org.apache.hudi.utilities.UtilHelpers;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
Expand Down Expand Up @@ -85,7 +86,7 @@ private HoodieTableMetaClient checkAndGetMetaClient() {
public String compactionsAll(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata",
unspecifiedDefaultValue = "false") final boolean includeExtraMetadata,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
Expand All @@ -100,10 +101,9 @@ public String compactionsAll(

List<HoodieInstant> instants = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (int i = 0; i < instants.size(); i++) {
HoodieInstant instant = instants.get(i);
for (HoodieInstant instant : instants) {
HoodieCompactionPlan compactionPlan = null;
if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
try {
// This could be a completed compaction. Assume a compaction request file is present but skip if fails
compactionPlan = AvroUtils.deserializeCompactionPlan(
Expand All @@ -118,7 +118,7 @@ public String compactionsAll(
}

if (null != compactionPlan) {
HoodieInstant.State state = instant.getState();
State state = instant.getState();
if (committed.contains(instant.getTimestamp())) {
state = State.COMPLETED;
}
Expand Down Expand Up @@ -146,7 +146,7 @@ public String compactionsAll(
public String compactionShow(
@CliOption(key = "instant", mandatory = true,
help = "Base path for the target hoodie dataset") final String compactionInstantTime,
@CliOption(key = {"limit"}, mandatory = false, help = "Limit commits",
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
Expand Down Expand Up @@ -212,8 +212,7 @@ public String compact(
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
@CliOption(key = "compactionInstant", mandatory = false,
help = "Base path for the target hoodie dataset") String compactionInstantTime,
@CliOption(key = "compactionInstant", help = "Base path for the target hoodie dataset") String compactionInstantTime,
@CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
unspecifiedDefaultValue = "") final String propsFilePath,
@CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
Expand Down Expand Up @@ -286,7 +285,7 @@ public String validateCompaction(

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = null;
String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
Expand All @@ -300,10 +299,10 @@ public String validateCompaction(
return "Failed to validate compaction for " + compactionInstant;
}
List<ValidationOpResult> res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs);
boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true);
boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true);
String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n";
List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> {
res.forEach(r -> {
Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(),
r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "",
r.getOperation().getDeltaFileNames().size(), r.isSuccess(),
Expand Down Expand Up @@ -347,7 +346,7 @@ public String unscheduleCompaction(

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
Expand Down Expand Up @@ -391,7 +390,7 @@ public String unscheduleCompactFile(

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
Expand Down Expand Up @@ -437,7 +436,7 @@ public String repairCompaction(

String outputPathStr = getTmpSerializerFile();
Path outputPath = new Path(outputPathStr);
String output = "";
String output;
try {
String sparkPropertiesPath = Utils
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
Expand Down Expand Up @@ -476,7 +475,7 @@ private String getRenamesToBePrinted(List<RenameOpResult> res, Integer limit, St
}

List<Comparable[]> rows = new ArrayList<>();
res.stream().forEach(r -> {
res.forEach(r -> {
Comparable[] row =
new Comparable[] {r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath,
r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""};
Expand Down
Loading