diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java index 53254320a3374..53114cefad585 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java @@ -84,7 +84,7 @@ private static String print(Table buffer) { buffer.getFieldNames().toArray(header); String[][] rows = - buffer.getRenderRows().stream().map(l -> l.stream().toArray(String[]::new)).toArray(String[][]::new); + buffer.getRenderRows().stream().map(l -> l.toArray(new String[l.size()])).toArray(String[][]::new); return printTextTable(header, rows); } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java index bebc7fc8d3e9d..8158eef8d5f84 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -89,7 +88,7 @@ public Table add(List row) { * @return */ public Table addAll(List> rows) { - rows.forEach(r -> add(r)); + rows.forEach(this::add); return this; } @@ -120,16 +119,11 @@ public Table flip() { */ private List> orderRows() { return orderingFieldNameOptional.map(orderingColumnName -> { - return rawRows.stream().sorted(new Comparator>() { - @Override - public int compare(List row1, List row2) { - Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName)); - Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName)); - int cmpRawResult = fieldForRow1.compareTo(fieldForRow2); - return isDescendingOptional.map(isDescending -> { - return isDescending ? -1 * cmpRawResult : cmpRawResult; - }).orElse(cmpRawResult); - } + return rawRows.stream().sorted((row1, row2) -> { + Comparable fieldForRow1 = row1.get(rowHeader.indexOf(orderingColumnName)); + Comparable fieldForRow2 = row2.get(rowHeader.indexOf(orderingColumnName)); + int cmpRawResult = fieldForRow1.compareTo(fieldForRow2); + return isDescendingOptional.map(isDescending -> isDescending ? -1 * cmpRawResult : cmpRawResult).orElse(cmpRawResult); }).collect(Collectors.toList()); }).orElse(rawRows); } @@ -141,16 +135,14 @@ private void sortAndLimit() { this.renderRows = new ArrayList<>(); final int limit = this.limitOptional.orElse(rawRows.size()); final List> orderedRows = orderRows(); - renderRows = orderedRows.stream().limit(limit).map(row -> { - return IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> { - String fieldName = rowHeader.get(idx); - if (fieldNameToConverterMap.containsKey(fieldName)) { - return fieldNameToConverterMap.get(fieldName).apply(row.get(idx)); - } - Object v = row.get(idx); - return v == null ? "null" : v.toString(); - }).collect(Collectors.toList()); - }).collect(Collectors.toList()); + renderRows = orderedRows.stream().limit(limit).map(row -> IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> { + String fieldName = rowHeader.get(idx); + if (fieldNameToConverterMap.containsKey(fieldName)) { + return fieldNameToConverterMap.get(fieldName).apply(row.get(idx)); + } + Object v = row.get(idx); + return v == null ? "null" : v.toString(); + }).collect(Collectors.toList())).collect(Collectors.toList()); } @Override diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java index c8f1dc8244641..f45550484c22c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java @@ -89,29 +89,27 @@ public String showArchivedCommits( .deepCopy(HoodieCommitMetadata.SCHEMA$, r.get("hoodieCommitMetadata")); final String instantTime = r.get("commitTime").toString(); final String action = r.get("actionType").toString(); - return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> { - return hoodieWriteStats.stream().map(hoodieWriteStat -> { - List row = new ArrayList<>(); - row.add(action); - row.add(instantTime); - row.add(hoodieWriteStat.getPartitionPath()); - row.add(hoodieWriteStat.getFileId()); - row.add(hoodieWriteStat.getPrevCommit()); - row.add(hoodieWriteStat.getNumWrites()); - row.add(hoodieWriteStat.getNumInserts()); - row.add(hoodieWriteStat.getNumDeletes()); - row.add(hoodieWriteStat.getNumUpdateWrites()); - row.add(hoodieWriteStat.getTotalLogFiles()); - row.add(hoodieWriteStat.getTotalLogBlocks()); - row.add(hoodieWriteStat.getTotalCorruptLogBlock()); - row.add(hoodieWriteStat.getTotalRollbackBlocks()); - row.add(hoodieWriteStat.getTotalLogRecords()); - row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted()); - row.add(hoodieWriteStat.getTotalWriteBytes()); - row.add(hoodieWriteStat.getTotalWriteErrors()); - return row; - }); - }).map(rowList -> rowList.toArray(new Comparable[0])); + return metadata.getPartitionToWriteStats().values().stream().flatMap(hoodieWriteStats -> hoodieWriteStats.stream().map(hoodieWriteStat -> { + List row = new ArrayList<>(); + row.add(action); + row.add(instantTime); + row.add(hoodieWriteStat.getPartitionPath()); + row.add(hoodieWriteStat.getFileId()); + row.add(hoodieWriteStat.getPrevCommit()); + row.add(hoodieWriteStat.getNumWrites()); + row.add(hoodieWriteStat.getNumInserts()); + row.add(hoodieWriteStat.getNumDeletes()); + row.add(hoodieWriteStat.getNumUpdateWrites()); + row.add(hoodieWriteStat.getTotalLogFiles()); + row.add(hoodieWriteStat.getTotalLogBlocks()); + row.add(hoodieWriteStat.getTotalCorruptLogBlock()); + row.add(hoodieWriteStat.getTotalRollbackBlocks()); + row.add(hoodieWriteStat.getTotalLogRecords()); + row.add(hoodieWriteStat.getTotalUpdatedRecordsCompacted()); + row.add(hoodieWriteStat.getTotalWriteBytes()); + row.add(hoodieWriteStat.getTotalWriteErrors()); + return row; + })).map(rowList -> rowList.toArray(new Comparable[0])); }).collect(Collectors.toList()); allStats.addAll(readCommits); reader.close(); @@ -183,14 +181,7 @@ private Comparable[] readCommit(GenericRecord record, boolean skipMetadata) { } break; } - case HoodieTimeline.COMMIT_ACTION: { - commitDetails.add(record.get("commitTime")); - commitDetails.add(record.get("actionType").toString()); - if (!skipMetadata) { - commitDetails.add(record.get("hoodieCommitMetadata").toString()); - } - break; - } + case HoodieTimeline.COMMIT_ACTION: case HoodieTimeline.DELTA_COMMIT_ACTION: { commitDetails.add(record.get("commitTime")); commitDetails.add(record.get("actionType").toString()); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java index 857fb0d84b5e3..7c304980e5c37 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java @@ -66,12 +66,11 @@ public String showCleans( HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants(); List cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List rows = new ArrayList<>(); - for (int i = 0; i < cleans.size(); i++) { - HoodieInstant clean = cleans.get(i); + for (HoodieInstant clean : cleans) { HoodieCleanMetadata cleanMetadata = - AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get()); - rows.add(new Comparable[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), - cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()}); + AvroUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get()); + rows.add(new Comparable[]{clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(), + cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()}); } TableHeader header = @@ -110,8 +109,8 @@ public String showCleanPartitions(@CliOption(key = {"clean"}, help = "clean to s String path = entry.getKey(); HoodieCleanPartitionMetadata stats = entry.getValue(); String policy = stats.getPolicy(); - Integer totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size(); - Integer totalFailedDeletedFiles = stats.getFailedDeleteFiles().size(); + int totalSuccessDeletedFiles = stats.getSuccessDeleteFiles().size(); + int totalFailedDeletedFiles = stats.getFailedDeleteFiles().size(); rows.add(new Comparable[] {path, policy, totalSuccessDeletedFiles, totalFailedDeletedFiles}); } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 089f6f462a56f..c5b2faab93b5b 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -53,7 +53,7 @@ public class CommitsCommand implements CommandMarker { @CliCommand(value = "commits show", help = "Show the commits") public String showCommits( - @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @@ -65,8 +65,7 @@ public String showCommits( HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); List commits = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List rows = new ArrayList<>(); - for (int i = 0; i < commits.size(); i++) { - HoodieInstant commit = commits.get(i); + for (HoodieInstant commit : commits) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(), @@ -76,9 +75,7 @@ public String showCommits( } Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put("Total Bytes Written", entry -> { - return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); - }); + fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written") .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated") @@ -95,7 +92,7 @@ public String refreshCommits() throws IOException { @CliCommand(value = "commit rollback", help = "Rollback a commit") public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to rollback") final String commitTime, - @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath) throws Exception { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); @@ -163,9 +160,7 @@ public String showCommitPartitions(@CliOption(key = {"commit"}, help = "Commit t } Map> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put("Total Bytes Written", entry -> { - return NumericUtils.humanReadableByteCount((Long.valueOf(entry.toString()))); - }); + fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); TableHeader header = new TableHeader().addTableHeaderField("Partition Path") .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated") @@ -240,8 +235,7 @@ public String compareCommits(@CliOption(key = {"path"}, help = "Path of the data } @CliCommand(value = "commits sync", help = "Compare commits with another Hoodie dataset") - public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) - throws Exception { + public String syncCommits(@CliOption(key = {"path"}, help = "Path of the dataset to compare to") final String path) { HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, path); HoodieCLI.state = HoodieCLI.CLIState.SYNC; return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 6a188c1827ac6..3a518ee16bb21 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -38,11 +38,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.func.OperationResult; +import org.apache.hudi.utilities.UtilHelpers; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hudi.utilities.UtilHelpers; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; @@ -85,7 +86,7 @@ private HoodieTableMetaClient checkAndGetMetaClient() { public String compactionsAll( @CliOption(key = {"includeExtraMetadata"}, help = "Include extra metadata", unspecifiedDefaultValue = "false") final boolean includeExtraMetadata, - @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @@ -100,10 +101,9 @@ public String compactionsAll( List instants = timeline.getReverseOrderedInstants().collect(Collectors.toList()); List rows = new ArrayList<>(); - for (int i = 0; i < instants.size(); i++) { - HoodieInstant instant = instants.get(i); + for (HoodieInstant instant : instants) { HoodieCompactionPlan compactionPlan = null; - if (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) { + if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) { try { // This could be a completed compaction. Assume a compaction request file is present but skip if fails compactionPlan = AvroUtils.deserializeCompactionPlan( @@ -118,7 +118,7 @@ public String compactionsAll( } if (null != compactionPlan) { - HoodieInstant.State state = instant.getState(); + State state = instant.getState(); if (committed.contains(instant.getTimestamp())) { state = State.COMPLETED; } @@ -146,7 +146,7 @@ public String compactionsAll( public String compactionShow( @CliOption(key = "instant", mandatory = true, help = "Base path for the target hoodie dataset") final String compactionInstantTime, - @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "-1") final Integer limit, @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") final String sortByField, @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending, @@ -212,8 +212,7 @@ public String compact( @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G", help = "Spark executor memory") final String sparkMemory, @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry, - @CliOption(key = "compactionInstant", mandatory = false, - help = "Base path for the target hoodie dataset") String compactionInstantTime, + @CliOption(key = "compactionInstant", help = "Base path for the target hoodie dataset") String compactionInstantTime, @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath, @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array", @@ -286,7 +285,7 @@ public String validateCompaction( String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); - String output = null; + String output; try { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); @@ -300,10 +299,10 @@ public String validateCompaction( return "Failed to validate compaction for " + compactionInstant; } List res = deSerializeOperationResult(outputPathStr, HoodieCLI.fs); - boolean valid = res.stream().map(r -> r.isSuccess()).reduce(Boolean::logicalAnd).orElse(true); + boolean valid = res.stream().map(OperationResult::isSuccess).reduce(Boolean::logicalAnd).orElse(true); String message = "\n\n\t COMPACTION PLAN " + (valid ? "VALID" : "INVALID") + "\n\n"; List rows = new ArrayList<>(); - res.stream().forEach(r -> { + res.forEach(r -> { Comparable[] row = new Comparable[] {r.getOperation().getFileId(), r.getOperation().getBaseInstantTime(), r.getOperation().getDataFileName().isPresent() ? r.getOperation().getDataFileName().get() : "", r.getOperation().getDeltaFileNames().size(), r.isSuccess(), @@ -347,7 +346,7 @@ public String unscheduleCompaction( String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); - String output = ""; + String output; try { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); @@ -391,7 +390,7 @@ public String unscheduleCompactFile( String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); - String output = ""; + String output; try { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); @@ -437,7 +436,7 @@ public String repairCompaction( String outputPathStr = getTmpSerializerFile(); Path outputPath = new Path(outputPathStr); - String output = ""; + String output; try { String sparkPropertiesPath = Utils .getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); @@ -476,7 +475,7 @@ private String getRenamesToBePrinted(List res, Integer limit, St } List rows = new ArrayList<>(); - res.stream().forEach(r -> { + res.forEach(r -> { Comparable[] row = new Comparable[] {r.getOperation().fileId, r.getOperation().srcPath, r.getOperation().destPath, r.isExecuted(), r.isSuccess(), r.getException().isPresent() ? r.getException().get().getMessage() : ""}; diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java index d5d1e82d6a73c..302931eb76a98 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/DatasetsCommand.java @@ -49,14 +49,14 @@ public class DatasetsCommand implements CommandMarker { @CliCommand(value = "connect", help = "Connect to a hoodie dataset") public String connect( @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the dataset") final String path, - @CliOption(key = {"layoutVersion"}, mandatory = false, help = "Timeline Layout version") Integer layoutVersion, - @CliOption(key = {"eventuallyConsistent"}, mandatory = false, unspecifiedDefaultValue = "false", + @CliOption(key = {"layoutVersion"}, help = "Timeline Layout version") Integer layoutVersion, + @CliOption(key = {"eventuallyConsistent"}, unspecifiedDefaultValue = "false", help = "Enable eventual consistency") final boolean eventuallyConsistent, - @CliOption(key = {"initialCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "2000", + @CliOption(key = {"initialCheckIntervalMs"}, unspecifiedDefaultValue = "2000", help = "Initial wait time for eventual consistency") final Integer initialConsistencyIntervalMs, - @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "300000", + @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "300000", help = "Max wait time for eventual consistency") final Integer maxConsistencyIntervalMs, - @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, unspecifiedDefaultValue = "7", + @CliOption(key = {"maxCheckIntervalMs"}, unspecifiedDefaultValue = "7", help = "Max checks for eventual consistency") final Integer maxConsistencyChecks) throws IOException { HoodieCLI @@ -118,7 +118,7 @@ public String createTable( /** * Describes table properties. */ - @CliCommand(value = "desc", help = "Describle Hoodie Table properties") + @CliCommand(value = "desc", help = "Describe Hoodie Table properties") public String descTable() { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); TableHeader header = new TableHeader().addTableHeaderField("Property").addTableHeaderField("Value"); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index e94e16a3e29ff..597bab3968bb4 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; @@ -90,13 +91,13 @@ public String showAllFileSlices( row[idx++] = fs.getDataFile().isPresent() ? fs.getDataFile().get().getFileSize() : -1; if (!readOptimizedOnly) { row[idx++] = fs.getLogFiles().count(); - row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum(); + row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum(); row[idx++] = fs.getLogFiles().collect(Collectors.toList()).toString(); } rows.add(row); })); Function converterFunction = - entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); + entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))); Map> fieldNameToConverterMap = new HashMap<>(); fieldNameToConverterMap.put("Total Delta File Size", converterFunction); fieldNameToConverterMap.put("Data-File Size", converterFunction); @@ -160,15 +161,15 @@ public String showLatestFileSlices( if (!readOptimizedOnly) { row[idx++] = fs.getLogFiles().count(); - row[idx++] = fs.getLogFiles().mapToLong(lf -> lf.getFileSize()).sum(); + row[idx++] = fs.getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum(); long logFilesScheduledForCompactionTotalSize = fs.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(fs.getBaseInstantTime())) - .mapToLong(lf -> lf.getFileSize()).sum(); + .mapToLong(HoodieLogFile::getFileSize).sum(); row[idx++] = logFilesScheduledForCompactionTotalSize; long logFilesUnscheduledTotalSize = fs.getLogFiles().filter(lf -> !lf.getBaseCommitTime().equals(fs.getBaseInstantTime())) - .mapToLong(lf -> lf.getFileSize()).sum(); + .mapToLong(HoodieLogFile::getFileSize).sum(); row[idx++] = logFilesUnscheduledTotalSize; double logSelectedForCompactionToBaseRatio = @@ -186,7 +187,7 @@ public String showLatestFileSlices( }); Function converterFunction = - entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); + entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))); Map> fieldNameToConverterMap = new HashMap<>(); fieldNameToConverterMap.put("Data-File Size", converterFunction); if (!readOptimizedOnly) { @@ -230,9 +231,9 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m FileSystem fs = HoodieCLI.fs; String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex); FileStatus[] statuses = fs.globStatus(new Path(globPath)); - Stream instantsStream = null; + Stream instantsStream; - HoodieTimeline timeline = null; + HoodieTimeline timeline; if (readOptimizedOnly) { timeline = metaClient.getActiveTimeline().getCommitTimeline(); } else if (excludeCompaction) { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java index 4ba9d3d6fd278..4c814758fbc5d 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HDFSParquetImportCommand.java @@ -23,8 +23,8 @@ import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.utilities.HDFSParquetImporter.FormatValidator; - import org.apache.hudi.utilities.UtilHelpers; + import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.util.Utils; import org.springframework.shell.core.CommandMarker; @@ -42,7 +42,7 @@ public class HDFSParquetImportCommand implements CommandMarker { @CliCommand(value = "hdfsparquetimport", help = "Imports Parquet dataset to a hoodie dataset") public String convert( - @CliOption(key = "upsert", mandatory = false, unspecifiedDefaultValue = "false", + @CliOption(key = "upsert", unspecifiedDefaultValue = "false", help = "Uses upsert API instead of the default insert API of WriteClient") boolean useUpsert, @CliOption(key = "srcPath", mandatory = true, help = "Base path for the input dataset") final String srcPath, @CliOption(key = "targetPath", mandatory = true, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index f9bea47859c52..8a50309d2dc88 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -38,8 +38,10 @@ import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.hive.util.SchemaUtil; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; @@ -84,14 +86,13 @@ public String showLogFileCommits( .map(status -> status.getPath().toString()).collect(Collectors.toList()); Map, Map>, Integer>>> commitCountAndMetadata = Maps.newHashMap(); - int totalEntries = 0; int numCorruptBlocks = 0; int dummyInstantTimeCount = 0; for (String logFilePath : logFilePaths) { FileStatus[] fsStatus = fs.listStatus(new Path(logFilePath)); Schema writerSchema = new AvroSchemaConverter() - .convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath))); + .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePath)))); Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(fsStatus[0].getPath()), writerSchema); // read the avro blocks @@ -124,14 +125,12 @@ public String showLogFileCommits( if (commitCountAndMetadata.containsKey(instantTime)) { commitCountAndMetadata.get(instantTime).add( new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); - totalEntries++; } else { List, Map>, Integer>> list = new ArrayList<>(); list.add( new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount)); commitCountAndMetadata.put(instantTime, list); - totalEntries++; } } reader.close(); @@ -141,7 +140,7 @@ public String showLogFileCommits( ObjectMapper objectMapper = new ObjectMapper(); for (Map.Entry, Map>, Integer>>> entry : commitCountAndMetadata .entrySet()) { - String instantTime = entry.getKey().toString(); + String instantTime = entry.getKey(); for (Tuple3, Map>, Integer> tuple3 : entry .getValue()) { Comparable[] output = new Comparable[5]; @@ -163,11 +162,11 @@ public String showLogFileCommits( @CliCommand(value = "show logfile records", help = "Read records from log files") public String showLogFileRecords( - @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", + @CliOption(key = {"limit"}, help = "Limit commits", unspecifiedDefaultValue = "10") final Integer limit, @CliOption(key = "logFilePathPattern", mandatory = true, help = "Fully qualified paths for the log files") final String logFilePathPattern, - @CliOption(key = "mergeRecords", mandatory = false, help = "If the records in the log files should be merged", + @CliOption(key = "mergeRecords", help = "If the records in the log files should be merged", unspecifiedDefaultValue = "false") final Boolean shouldMerge) throws IOException { @@ -182,7 +181,7 @@ public String showLogFileRecords( AvroSchemaConverter converter = new AvroSchemaConverter(); // get schema from last log file Schema readerSchema = - converter.convert(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1)))); + converter.convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(fs, new Path(logFilePaths.get(logFilePaths.size() - 1))))); List allRecords = new ArrayList<>(); @@ -191,11 +190,11 @@ public String showLogFileRecords( HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, client.getBasePath(), logFilePaths, readerSchema, client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(), - Long.valueOf(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES), - Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), - Boolean.valueOf(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), - Integer.valueOf(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), - HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); + HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, + Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED), + Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED), + HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE, + HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH); for (HoodieRecord hoodieRecord : scanner) { Option record = hoodieRecord.getData().getInsertValue(readerSchema); if (allRecords.size() < limit) { @@ -205,7 +204,7 @@ public String showLogFileRecords( } else { for (String logFile : logFilePaths) { Schema writerSchema = new AvroSchemaConverter() - .convert(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile))); + .convert(Preconditions.checkNotNull(SchemaUtil.readSchemaFromLogFile(client.getFs(), new Path(logFile)))); HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema); // read the avro blocks diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java index 152e21c018751..5346b98d6a4f3 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieSyncCommand.java @@ -31,6 +31,7 @@ import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; @@ -50,7 +51,7 @@ public String validateSync( help = "total number of recent partitions to validate") final int partitionCount, @CliOption(key = {"hiveServerUrl"}, mandatory = true, help = "hiveServerURL to connect to") final String hiveServerUrl, - @CliOption(key = {"hiveUser"}, mandatory = false, unspecifiedDefaultValue = "", + @CliOption(key = {"hiveUser"}, unspecifiedDefaultValue = "", help = "hive username to connect to") final String hiveUser, @CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "", help = "hive password to connect to") final String hivePass) @@ -80,33 +81,27 @@ public String validateSync( if (sourceLatestCommit != null && HoodieTimeline.compareTimestamps(targetLatestCommit, sourceLatestCommit, HoodieTimeline.GREATER)) { // source is behind the target - List commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE) - .getInstants().collect(Collectors.toList()); - if (commitsToCatchup.isEmpty()) { - return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count(" - + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount); - } else { - long newInserts = CommitUtil.countNewRecords(target, - commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList())); - return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count(" - + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is " - + newInserts; - } + return getString(target, targetTimeline, source, sourceCount, targetCount, sourceLatestCommit); } else { - List commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE) - .getInstants().collect(Collectors.toList()); - if (commitsToCatchup.isEmpty()) { - return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count(" - + target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount); - } else { - long newInserts = CommitUtil.countNewRecords(source, - commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList())); - return "Count difference now is (count(" + source.getTableConfig().getTableName() + ") - count(" - + target.getTableConfig().getTableName() + ") == " + (sourceCount - targetCount) + ". Catch up count is " - + newInserts; - } + return getString(source, sourceTimeline, target, targetCount, sourceCount, targetLatestCommit); } } + private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit) + throws IOException { + List commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE) + .getInstants().collect(Collectors.toList()); + if (commitsToCatchup.isEmpty()) { + return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count(" + + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount); + } else { + long newInserts = CommitUtil.countNewRecords(target, + commitsToCatchup.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList())); + return "Count difference now is (count(" + target.getTableConfig().getTableName() + ") - count(" + + source.getTableConfig().getTableName() + ") == " + (targetCount - sourceCount) + ". Catch up count is " + + newInserts; + } + } + } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 4b4c2834043be..37f66f6ab3ff6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -49,7 +49,7 @@ public String deduplicate( mandatory = true) final String duplicatedPartitionPath, @CliOption(key = {"repairedOutputPath"}, help = "Location to place the repaired files", mandatory = true) final String repairedOutputPath, - @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path", + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path", mandatory = true) final String sparkPropertiesPath) throws Exception { SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java index a3eb6a7bac9cd..4a122c6f35261 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java @@ -99,20 +99,18 @@ public String showRollback( HoodieRollbackMetadata metadata = AvroUtils.deserializeAvroMetadata( activeTimeline.getInstantDetails(new HoodieInstant(State.COMPLETED, ROLLBACK_ACTION, rollbackInstant)).get(), HoodieRollbackMetadata.class); - metadata.getPartitionMetadata().entrySet().forEach(e -> { - Stream - .concat(e.getValue().getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)), - e.getValue().getFailedDeleteFiles().stream().map(f -> Pair.of(f, false))) - .forEach(fileWithDeleteStatus -> { - Comparable[] row = new Comparable[5]; - row[0] = metadata.getStartRollbackTime(); - row[1] = metadata.getCommitsRollback().toString(); - row[2] = e.getKey(); - row[3] = fileWithDeleteStatus.getLeft(); - row[4] = fileWithDeleteStatus.getRight(); - rows.add(row); - }); - }); + metadata.getPartitionMetadata().forEach((key, value) -> Stream + .concat(value.getSuccessDeleteFiles().stream().map(f -> Pair.of(f, true)), + value.getFailedDeleteFiles().stream().map(f -> Pair.of(f, false))) + .forEach(fileWithDeleteStatus -> { + Comparable[] row = new Comparable[5]; + row[0] = metadata.getStartRollbackTime(); + row[1] = metadata.getCommitsRollback().toString(); + row[2] = key; + row[3] = fileWithDeleteStatus.getLeft(); + row[4] = fileWithDeleteStatus.getRight(); + rows.add(row); + })); TableHeader header = new TableHeader().addTableHeaderField("Instant").addTableHeaderField("Rolledback Instants") .addTableHeaderField("Partition").addTableHeaderField("Deleted File").addTableHeaderField("Succeeded"); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java index 69a1584b19f20..d28ba277b179a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java @@ -93,7 +93,7 @@ public String savepoint(@CliOption(key = {"commit"}, help = "Commit to savepoint @CliCommand(value = "savepoint rollback", help = "Savepoint a commit") public String rollbackToSavepoint( @CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final String commitTime, - @CliOption(key = {"sparkProperties"}, help = "Spark Properites File Path") final String sparkPropertiesPath) + @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath) throws Exception { HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient(); if (metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty()) { @@ -122,7 +122,7 @@ public String rollbackToSavepoint( } @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints") - public String refreshMetaClient() throws IOException { + public String refreshMetaClient() { HoodieCLI.refreshTableMetadata(); return "Metadata for table " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " refreshed."; } diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 8ff52fad9cc1a..13d1c8b75c1c8 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -124,19 +124,19 @@ public static void main(String[] args) throws Exception { case COMPACT_REPAIR: assert (args.length == 8); doCompactRepair(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], - Boolean.valueOf(args[7])); + Boolean.parseBoolean(args[7])); returnCode = 0; break; case COMPACT_UNSCHEDULE_FILE: assert (args.length == 9); doCompactUnscheduleFile(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], - Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); + Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; case COMPACT_UNSCHEDULE_PLAN: assert (args.length == 9); doCompactUnschedule(jsc, args[1], args[2], args[3], Integer.parseInt(args[4]), args[5], args[6], - Boolean.valueOf(args[7]), Boolean.valueOf(args[8])); + Boolean.parseBoolean(args[7]), Boolean.parseBoolean(args[8])); returnCode = 0; break; case CLEAN: diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java index 7fc3b25c13f8a..b05aee27b4722 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java @@ -73,7 +73,6 @@ public String writeAmplificationStats( HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); List rows = new ArrayList<>(); - int i = 0; DecimalFormat df = new DecimalFormat("#.00"); for (HoodieInstant commitTime : timeline.getInstants().collect(Collectors.toList())) { String waf = "0"; @@ -94,7 +93,7 @@ public String writeAmplificationStats( rows.add(new Comparable[] {"Total", totalRecordsUpserted, totalRecordsWritten, waf}); TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Upserted") - .addTableHeaderField("Total Written").addTableHeaderField("Write Amplifiation Factor"); + .addTableHeaderField("Total Written").addTableHeaderField("Write Amplification Factor"); return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, descending, limit, headerOnly, rows); } @@ -120,7 +119,7 @@ public String fileSizeStats( // max, min, #small files < 10MB, 50th, avg, 95th Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES)); - HashMap commitHistoMap = new HashMap(); + HashMap commitHistoMap = new HashMap<>(); for (FileStatus fileStatus : statuses) { String commitTime = FSUtils.getCommitTime(fileStatus.getPath().getName()); long sz = fileStatus.getLen(); @@ -132,7 +131,6 @@ public String fileSizeStats( } List rows = new ArrayList<>(); - int ind = 0; for (String commitTime : commitHistoMap.keySet()) { Snapshot s = commitHistoMap.get(commitTime).getSnapshot(); rows.add(printFileSizeHistogram(commitTime, s)); @@ -141,7 +139,7 @@ public String fileSizeStats( rows.add(printFileSizeHistogram("ALL", s)); Function converterFunction = - entry -> NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); + entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))); Map> fieldNameToConverterMap = new HashMap<>(); fieldNameToConverterMap.put("Min", converterFunction); fieldNameToConverterMap.put("10th", converterFunction); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java index 5b5a3f54f21b2..b71a979aea437 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java @@ -24,7 +24,8 @@ import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.StringUtils; -import org.apache.log4j.Logger; +import com.google.common.base.Preconditions; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.launcher.SparkLauncher; @@ -38,8 +39,7 @@ */ public class SparkUtil { - private static final Logger LOG = Logger.getLogger(SparkUtil.class); - public static final String DEFUALT_SPARK_MASTER = "yarn-client"; + public static final String DEFAULT_SPARK_MASTER = "yarn-client"; /** * TODO: Need to fix a bunch of hardcoded stuff here eg: history server, spark distro. @@ -55,7 +55,7 @@ public static SparkLauncher initLauncher(String propertiesFile) throws URISyntax sparkLauncher.setPropertiesFile(propertiesFile); } File libDirectory = new File(new File(currentJar).getParent(), "lib"); - for (String library : libDirectory.list()) { + for (String library : Preconditions.checkNotNull(libDirectory.list())) { sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()); } return sparkLauncher; @@ -66,7 +66,7 @@ public static JavaSparkContext initJavaSparkConf(String name) { String defMasterFromEnv = sparkConf.getenv("SPARK_MASTER"); if ((null == defMasterFromEnv) || (defMasterFromEnv.isEmpty())) { - sparkConf.setMaster(DEFUALT_SPARK_MASTER); + sparkConf.setMaster(DEFAULT_SPARK_MASTER); } else { sparkConf.setMaster(defMasterFromEnv); } @@ -82,7 +82,7 @@ public static JavaSparkContext initJavaSparkConf(String name) { sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - sparkConf = HoodieWriteClient.registerClasses(sparkConf); + HoodieWriteClient.registerClasses(sparkConf); JavaSparkContext jsc = new JavaSparkContext(sparkConf); jsc.hadoopConfiguration().setBoolean("parquet.enable.summary-metadata", false); FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());