diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 8ba0afd12ea98..7e91df155749e 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -869,12 +869,11 @@ private void startCommit(String instantTime) { LOG.info("Generate a new instant time " + instantTime); HoodieTableMetaClient metaClient = createMetaClient(true); // if there are pending compactions, their instantTime must not be greater than that of this instant time - metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> { - Preconditions.checkArgument( - HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), - "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" - + latestPending + ", Ingesting at " + instantTime); - }); + metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending -> + Preconditions.checkArgument( + HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), instantTime, HoodieTimeline.LESSER), + "Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :" + + latestPending + ", Ingesting at " + instantTime)); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java index 2416eba7fbfc4..20b00f1457095 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java @@ -49,15 +49,13 @@ public class MercifulJsonConverter { * Build type processor map for each avro type. */ private static Map getFieldTypeProcessors() { - Map processorMap = - new ImmutableMap.Builder().put(Type.STRING, generateStringTypeHandler()) - .put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler()) - .put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler()) - .put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler()) - .put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler()) - .put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler()) - .put(Type.FIXED, generateFixedTypeHandler()).build(); - return processorMap; + return new ImmutableMap.Builder().put(Type.STRING, generateStringTypeHandler()) + .put(Type.BOOLEAN, generateBooleanTypeHandler()).put(Type.DOUBLE, generateDoubleTypeHandler()) + .put(Type.FLOAT, generateFloatTypeHandler()).put(Type.INT, generateIntTypeHandler()) + .put(Type.LONG, generateLongTypeHandler()).put(Type.ARRAY, generateArrayTypeHandler()) + .put(Type.RECORD, generateRecordTypeHandler()).put(Type.ENUM, generateEnumTypeHandler()) + .put(Type.MAP, generateMapTypeHandler()).put(Type.BYTES, generateBytesTypeHandler()) + .put(Type.FIXED, generateFixedTypeHandler()).build(); } /** @@ -286,7 +284,7 @@ private static JsonToAvroFieldProcessor generateArrayTypeHandler() { public Pair convert(Object value, String name, Schema schema) throws HoodieJsonToAvroConversionException { Schema elementSchema = schema.getElementType(); - List listRes = new ArrayList(); + List listRes = new ArrayList<>(); for (Object v : (List) value) { listRes.add(convertJsonToAvroField(v, name, elementSchema)); } @@ -301,7 +299,7 @@ private static JsonToAvroFieldProcessor generateMapTypeHandler() { public Pair convert(Object value, String name, Schema schema) throws HoodieJsonToAvroConversionException { Schema valueSchema = schema.getValueType(); - Map mapRes = new HashMap(); + Map mapRes = new HashMap<>(); for (Map.Entry v : ((Map) value).entrySet()) { mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema)); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java index 5468ae919803f..a98e38e284e5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/InternalDynamicBloomFilter.java @@ -98,8 +98,7 @@ public void add(Key key) { @Override public void and(InternalFilter filter) { - if (filter == null - || !(filter instanceof InternalDynamicBloomFilter) + if (!(filter instanceof InternalDynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be and-ed"); @@ -122,8 +121,8 @@ public boolean membershipTest(Key key) { return true; } - for (int i = 0; i < matrix.length; i++) { - if (matrix[i].membershipTest(key)) { + for (BloomFilter bloomFilter : matrix) { + if (bloomFilter.membershipTest(key)) { return true; } } @@ -133,15 +132,14 @@ public boolean membershipTest(Key key) { @Override public void not() { - for (int i = 0; i < matrix.length; i++) { - matrix[i].not(); + for (BloomFilter bloomFilter : matrix) { + bloomFilter.not(); } } @Override public void or(InternalFilter filter) { - if (filter == null - || !(filter instanceof InternalDynamicBloomFilter) + if (!(filter instanceof InternalDynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be or-ed"); @@ -159,8 +157,7 @@ public void or(InternalFilter filter) { @Override public void xor(InternalFilter filter) { - if (filter == null - || !(filter instanceof InternalDynamicBloomFilter) + if (!(filter instanceof InternalDynamicBloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be xor-ed"); @@ -180,8 +177,8 @@ public void xor(InternalFilter filter) { public String toString() { StringBuilder res = new StringBuilder(); - for (int i = 0; i < matrix.length; i++) { - res.append(matrix[i]); + for (BloomFilter bloomFilter : matrix) { + res.append(bloomFilter); res.append(Character.LINE_SEPARATOR); } return res.toString(); @@ -195,8 +192,8 @@ public void write(DataOutput out) throws IOException { out.writeInt(nr); out.writeInt(currentNbRecord); out.writeInt(matrix.length); - for (int i = 0; i < matrix.length; i++) { - matrix[i].write(out); + for (BloomFilter bloomFilter : matrix) { + bloomFilter.write(out); } } @@ -217,13 +214,9 @@ public void readFields(DataInput in) throws IOException { * Adds a new row to this dynamic Bloom filter. */ private void addRow() { - org.apache.hadoop.util.bloom.BloomFilter[] tmp = new org.apache.hadoop.util.bloom.BloomFilter[matrix.length + 1]; - - for (int i = 0; i < matrix.length; i++) { - tmp[i] = matrix[i]; - } - - tmp[tmp.length - 1] = new org.apache.hadoop.util.bloom.BloomFilter(vectorSize, nbHash, hashType); + BloomFilter[] tmp = new BloomFilter[matrix.length + 1]; + System.arraycopy(matrix, 0, tmp, 0, matrix.length); + tmp[tmp.length - 1] = new BloomFilter(vectorSize, nbHash, hashType); matrix = tmp; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java index 0610e082ea22d..a5e784811f978 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/bloom/filter/SimpleBloomFilter.java @@ -41,7 +41,7 @@ public class SimpleBloomFilter implements BloomFilter { - private org.apache.hadoop.util.bloom.BloomFilter filter = null; + private org.apache.hadoop.util.bloom.BloomFilter filter; /** * Create a new Bloom filter with the given configurations. @@ -114,8 +114,7 @@ private void writeObject(ObjectOutputStream os) filter.write(os); } - private void readObject(ObjectInputStream is) - throws IOException, ClassNotFoundException { + private void readObject(ObjectInputStream is) throws IOException { filter = new org.apache.hadoop.util.bloom.BloomFilter(); filter.readFields(is); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java index 5ddb837d30179..63ee7df2ba19e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/io/storage/HoodieWrapperFileSystem.java @@ -114,7 +114,7 @@ public static String getHoodieScheme(String scheme) { } @Override - public void initialize(URI uri, Configuration conf) throws IOException { + public void initialize(URI uri, Configuration conf) { // Get the default filesystem to decorate Path path = new Path(uri); // Remove 'hoodie-' prefix from path diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java index 99bc2d4d8bf30..e3e31dbdd67dc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TimelineLayout.java @@ -66,8 +66,8 @@ private static class TimelineLayoutV1 extends TimelineLayout { @Override public Stream filterHoodieInstants(Stream instantStream) { return instantStream.collect(Collectors.groupingBy(instant -> Pair.of(instant.getTimestamp(), - HoodieInstant.getComparableAction(instant.getAction())))).entrySet().stream() - .map(e -> e.getValue().stream().reduce((x, y) -> { + HoodieInstant.getComparableAction(instant.getAction())))).values().stream() + .map(hoodieInstants -> hoodieInstants.stream().reduce((x, y) -> { // Pick the one with the highest state if (x.getState().compareTo(y.getState()) >= 0) { return x; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index a7226a7e64ee0..f322d473aa4f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -64,10 +64,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public static final SimpleDateFormat COMMIT_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmss"); public static final Set VALID_EXTENSIONS_IN_ACTIVE_TIMELINE = new HashSet<>(Arrays.asList( - new String[]{COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, - INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, - INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION})); + COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, + INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, + INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, + INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -79,7 +79,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { */ public static String createNewInstantTime() { return lastInstantTime.updateAndGet((oldVal) -> { - String newCommitTime = null; + String newCommitTime; do { newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date()); } while (HoodieTimeline.compareTimestamps(newCommitTime, oldVal, LESSER_OR_EQUAL)); @@ -255,7 +255,7 @@ public void deletePending(HoodieInstant instant) { public void deleteCompactionRequested(HoodieInstant instant) { Preconditions.checkArgument(instant.isRequested()); - Preconditions.checkArgument(instant.getAction() == HoodieTimeline.COMPACTION_ACTION); + Preconditions.checkArgument(instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION)); deleteInstantFile(instant); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index c61355ce35df7..78d6c6f965f1a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -65,7 +65,7 @@ public void setInstants(List instants) { final MessageDigest md; try { md = MessageDigest.getInstance(HASHING_ALGORITHM); - this.instants.stream().forEach(i -> md + this.instants.forEach(i -> md .update(StringUtils.joinUsingDelim("_", i.getTimestamp(), i.getAction(), i.getState().name()).getBytes())); } catch (NoSuchAlgorithmException nse) { throw new HoodieException(nse); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 460d0c05c49b2..460931ba626ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -45,13 +45,13 @@ public class HoodieInstant implements Serializable, Comparable { .put(HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.COMMIT_ACTION).build(); public static final Comparator ACTION_COMPARATOR = - Comparator.comparing(instant -> getComparableAction(instant.getAction())); + Comparator.comparing(instant -> getComparableAction(instant.getAction())); public static final Comparator COMPARATOR = Comparator.comparing(HoodieInstant::getTimestamp) .thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState); - public static final String getComparableAction(String action) { - return COMPARABLE_ACTIONS.containsKey(action) ? COMPARABLE_ACTIONS.get(action) : action; + public static String getComparableAction(String action) { + return COMPARABLE_ACTIONS.getOrDefault(action, action); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 2a247acb38e5b..8d50ef598a74d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -95,7 +95,7 @@ public SyncableFileSystemView getFileSystemView(String basePath) { * Closes all views opened. */ public void close() { - this.globalViewMap.values().stream().forEach(v -> v.close()); + this.globalViewMap.values().forEach(SyncableFileSystemView::close); this.globalViewMap.clear(); } @@ -196,7 +196,7 @@ public static FileSystemViewManager createViewManager(final SerializableConfigur return new FileSystemViewManager(conf, config, (basePath, viewConfig) -> { RemoteHoodieTableFileSystemView remoteFileSystemView = createRemoteFileSystemView(conf, viewConfig, new HoodieTableMetaClient(conf.newCopy(), basePath)); - SyncableFileSystemView secondaryView = null; + SyncableFileSystemView secondaryView; switch (viewConfig.getSecondaryStorageType()) { case MEMORY: secondaryView = createInMemoryFileSystemView(conf, viewConfig, basePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java index b72696a4ed7c2..4cf6942b7297d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewStorageConfig.java @@ -69,7 +69,7 @@ public FileSystemViewStorageType getStorageType() { } public boolean isIncrementalTimelineSyncEnabled() { - return Boolean.valueOf(props.getProperty(FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE)); + return Boolean.parseBoolean(props.getProperty(FILESYSTEM_VIEW_INCREMENTAL_SYNC_MODE)); } public String getRemoteViewServerHost() { @@ -87,10 +87,8 @@ public long getMaxMemoryForFileGroupMap() { public long getMaxMemoryForPendingCompaction() { long totalMemory = Long.parseLong(props.getProperty(FILESYSTEM_VIEW_SPILLABLE_MEM)); - long reservedForPendingComaction = - new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION))) - .longValue(); - return reservedForPendingComaction; + return new Double(totalMemory * Double.parseDouble(props.getProperty(FILESYSTEM_VIEW_PENDING_COMPACTION_MEM_FRACTION))) + .longValue(); } public String getBaseStoreDir() { @@ -113,12 +111,9 @@ public static class Builder { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index 1a00bca98f3ef..a937647eafe81 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -85,7 +85,7 @@ public RocksDbBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeli @Override protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { - schemaHelper.getAllColumnFamilies().stream().forEach(rocksDB::addColumnFamily); + schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily); super.init(metaClient, visibleActiveTimeline); LOG.info("Created ROCKSDB based file-system view at " + config.getRocksdbBasePath()); } @@ -98,39 +98,39 @@ protected boolean isPendingCompactionScheduledForFileId(HoodieFileGroupId fgId) @Override protected void resetPendingCompactionOperations(Stream> operations) { rocksDB.writeBatch(batch -> { - operations.forEach(opPair -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), - schemaHelper.getKeyForPendingCompactionLookup(opPair.getValue().getFileGroupId()), opPair); - }); + operations.forEach(opPair -> + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), + schemaHelper.getKeyForPendingCompactionLookup(opPair.getValue().getFileGroupId()), opPair) + ); LOG.info("Initializing pending compaction operations. Count=" + batch.count()); }); } @Override protected void addPendingCompactionOperations(Stream> operations) { - rocksDB.writeBatch(batch -> { - operations.forEach(opInstantPair -> { - Preconditions.checkArgument(!isPendingCompactionScheduledForFileId(opInstantPair.getValue().getFileGroupId()), - "Duplicate FileGroupId found in pending compaction operations. FgId :" - + opInstantPair.getValue().getFileGroupId()); - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), - schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId()), opInstantPair); - }); - }); + rocksDB.writeBatch(batch -> + operations.forEach(opInstantPair -> { + Preconditions.checkArgument(!isPendingCompactionScheduledForFileId(opInstantPair.getValue().getFileGroupId()), + "Duplicate FileGroupId found in pending compaction operations. FgId :" + + opInstantPair.getValue().getFileGroupId()); + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), + schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId()), opInstantPair); + }) + ); } @Override void removePendingCompactionOperations(Stream> operations) { - rocksDB.writeBatch(batch -> { - operations.forEach(opInstantPair -> { - Preconditions.checkArgument( - getPendingCompactionOperationWithInstant(opInstantPair.getValue().getFileGroupId()) != null, - "Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :" - + opInstantPair.getValue().getFileGroupId()); - rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), - schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId())); - }); - }); + rocksDB.writeBatch(batch -> + operations.forEach(opInstantPair -> { + Preconditions.checkArgument( + getPendingCompactionOperationWithInstant(opInstantPair.getValue().getFileGroupId()) != null, + "Trying to remove a FileGroupId which is not found in pending compaction operations. FgId :" + + opInstantPair.getValue().getFileGroupId()); + rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(), + schemaHelper.getKeyForPendingCompactionLookup(opInstantPair.getValue().getFileGroupId())); + }) + ); } @Override @@ -170,17 +170,16 @@ protected void storePartitionView(String partitionPath, List fi schemaHelper.getPrefixForDataFileViewByPartition(partitionPath)); // Now add them - fileGroups.stream().forEach(fg -> { - rocksDB.writeBatch(batch -> { - fg.getAllFileSlicesIncludingInflight().forEach(fs -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); - fs.getBaseFile().ifPresent(df -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), - df); - }); - }); - }); - }); + fileGroups.forEach(fg -> + rocksDB.writeBatch(batch -> + fg.getAllFileSlicesIncludingInflight().forEach(fs -> { + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); + fs.getBaseFile().ifPresent(df -> + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), df) + ); + }) + ) + ); // record that partition is loaded. rocksDB.put(schemaHelper.getColFamilyForStoredPartitions(), lookupKey, Boolean.TRUE); @@ -194,69 +193,66 @@ protected void storePartitionView(String partitionPath, List fi */ protected void applyDeltaFileSlicesToPartitionView(String partition, List deltaFileGroups, DeltaApplyMode mode) { - rocksDB.writeBatch(batch -> { - deltaFileGroups.stream().forEach(fg -> { - fg.getAllRawFileSlices().map(fs -> { - FileSlice oldSlice = getFileSlice(partition, fs.getFileId(), fs.getBaseInstantTime()); - if (null == oldSlice) { - return fs; - } else { - // First remove the file-slice - LOG.info("Removing old Slice in DB. FS=" + oldSlice); - rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), - schemaHelper.getKeyForSliceView(fg, oldSlice)); - rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), - schemaHelper.getKeyForDataFileView(fg, oldSlice)); - - Map logFiles = oldSlice.getLogFiles() - .map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - Map deltaLogFiles = - fs.getLogFiles().map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) + rocksDB.writeBatch(batch -> + deltaFileGroups.forEach(fg -> + fg.getAllRawFileSlices().map(fs -> { + FileSlice oldSlice = getFileSlice(partition, fs.getFileId(), fs.getBaseInstantTime()); + if (null == oldSlice) { + return fs; + } else { + // First remove the file-slice + LOG.info("Removing old Slice in DB. FS=" + oldSlice); + rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, oldSlice)); + rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, oldSlice)); + + Map logFiles = oldSlice.getLogFiles() + .map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - switch (mode) { - case ADD: { - FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); - oldSlice.getBaseFile().ifPresent(df -> newFileSlice.setBaseFile(df)); - fs.getBaseFile().ifPresent(df -> newFileSlice.setBaseFile(df)); - Map newLogFiles = new HashMap<>(logFiles); - deltaLogFiles.entrySet().stream().filter(e -> !logFiles.containsKey(e.getKey())) - .forEach(p -> newLogFiles.put(p.getKey(), p.getValue())); - newLogFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf)); - LOG.info("Adding back new File Slice after add FS=" + newFileSlice); - return newFileSlice; - } - case REMOVE: { - LOG.info("Removing old File Slice =" + fs); - FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); - fs.getBaseFile().orElseGet(() -> { - oldSlice.getBaseFile().ifPresent(df -> newFileSlice.setBaseFile(df)); - return null; - }); - - deltaLogFiles.keySet().stream().forEach(p -> logFiles.remove(p)); - // Add remaining log files back - logFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf)); - if (newFileSlice.getBaseFile().isPresent() || (newFileSlice.getLogFiles().count() > 0)) { - LOG.info("Adding back new file-slice after remove FS=" + newFileSlice); - return newFileSlice; + Map deltaLogFiles = + fs.getLogFiles().map(lf -> Pair.of(Path.getPathWithoutSchemeAndAuthority(lf.getPath()).toString(), lf)) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + switch (mode) { + case ADD: { + FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); + oldSlice.getBaseFile().ifPresent(newFileSlice::setBaseFile); + fs.getBaseFile().ifPresent(newFileSlice::setBaseFile); + Map newLogFiles = new HashMap<>(logFiles); + deltaLogFiles.entrySet().stream().filter(e -> !logFiles.containsKey(e.getKey())) + .forEach(p -> newLogFiles.put(p.getKey(), p.getValue())); + newLogFiles.values().forEach(newFileSlice::addLogFile); + LOG.info("Adding back new File Slice after add FS=" + newFileSlice); + return newFileSlice; + } + case REMOVE: { + LOG.info("Removing old File Slice =" + fs); + FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime()); + fs.getBaseFile().orElseGet(() -> { + oldSlice.getBaseFile().ifPresent(newFileSlice::setBaseFile); + return null; + }); + + deltaLogFiles.keySet().forEach(logFiles::remove); + // Add remaining log files back + logFiles.values().forEach(newFileSlice::addLogFile); + if (newFileSlice.getBaseFile().isPresent() || (newFileSlice.getLogFiles().count() > 0)) { + LOG.info("Adding back new file-slice after remove FS=" + newFileSlice); + return newFileSlice; + } + return null; + } + default: + throw new IllegalStateException("Unknown diff apply mode=" + mode); } - return null; } - default: - throw new IllegalStateException("Unknown diff apply mode=" + mode); - } - } - }).filter(Objects::nonNull).forEach(fs -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); - fs.getBaseFile().ifPresent(df -> { - rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), - df); - }); - }); - }); - }); + }).filter(Objects::nonNull).forEach(fs -> { + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForSliceView(fg, fs), fs); + fs.getBaseFile().ifPresent(df -> + rocksDB.putInBatch(batch, schemaHelper.getColFamilyForView(), schemaHelper.getKeyForDataFileView(fg, fs), df) + ); + }) + ) + ); } @Override @@ -330,7 +326,7 @@ private Stream getFileGroups(Stream sliceStream) { private FileSlice getFileSlice(String partitionPath, String fileId, String instantTime) { String key = schemaHelper.getKeyForSliceView(partitionPath, fileId, instantTime); - return rocksDB.get(schemaHelper.getColFamilyForView(), key); + return rocksDB.get(schemaHelper.getColFamilyForView(), key); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 3ada17e95d1eb..c806523b1ae65 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -106,8 +106,6 @@ Stream> fetchPendingCompactionOperations() { @Override public Stream fetchAllStoredFileGroups() { - return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream().flatMap(fg -> { - return ((List) fg).stream(); - }); + return ((ExternalSpillableMap) partitionToFileGroupsMap).valueStream().flatMap(fg -> ((List) fg).stream()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java index a856926dbaae9..1962894639894 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroUtils.java @@ -42,7 +42,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -58,7 +58,7 @@ public static HoodieRestoreMetadata convertRestoreMetadata(String startRestoreTi ImmutableMap.Builder> commitToStatBuilder = ImmutableMap.builder(); for (Map.Entry> commitToStat : commitToStats.entrySet()) { commitToStatBuilder.put(commitToStat.getKey(), - Arrays.asList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); + Collections.singletonList(convertRollbackMetadata(startRestoreTime, durationInMs, commits, commitToStat.getValue()))); } return new HoodieRestoreMetadata(startRestoreTime, durationInMs.orElseGet(() -> -1L), commits, commitToStatBuilder.build(), DEFAULT_VERSION); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java index 416208b8bd359..25fe7b08080f3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BufferedRandomAccessFile.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -59,7 +59,7 @@ public final class BufferedRandomAccessFile extends RandomAccessFile { private static final Logger LOG = Logger.getLogger(BufferedRandomAccessFile.class); static final int DEFAULT_BUFFER_SIZE = (1 << 16); // 64K buffer - static final int BUFFER_BOUNDARY_MASK = ~(DEFAULT_BUFFER_SIZE - 1); + static final int BUFFER_BOUNDARY_MASK = -DEFAULT_BUFFER_SIZE; private int capacity; private ByteBuffer dataBuffer; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index 84b7ee4c50cad..1d32b6406c9b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -55,10 +55,9 @@ public static HoodieCleanMetadata convertCleanMetadata(HoodieTableMetaClient met } } - HoodieCleanMetadata metadata = new HoodieCleanMetadata(startCleanTime, + return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, partitionMetadataBuilder.build(), CLEAN_METADATA_VERSION_2); - return metadata; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 40988b49d9058..c5ce52673d642 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -90,7 +90,7 @@ public static HoodieCompactionPlan buildFromFileSlices(List> extraMetadata, Option, Map>> metricsCaptureFunction) { HoodieCompactionPlan.Builder builder = HoodieCompactionPlan.newBuilder(); - extraMetadata.ifPresent(m -> builder.setExtraMetadata(m)); + extraMetadata.ifPresent(builder::setExtraMetadata); builder.setOperations(partitionFileSlicePairs.stream() .map(pfPair -> buildFromFileSlice(pfPair.getKey(), pfPair.getValue(), metricsCaptureFunction)) @@ -157,25 +157,23 @@ public static Map> ge Map> fgIdToPendingCompactionWithInstantMap = new HashMap<>(); - pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> { - return getPendingCompactionOperations(instantPlanPair.getKey(), instantPlanPair.getValue()); - }).forEach(pair -> { - // Defensive check to ensure a single-fileId does not have more than one pending compaction with different - // file slices. If we find a full duplicate we assume it is caused by eventual nature of the move operation - // on some DFSs. - if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { - HoodieCompactionOperation operation = pair.getValue().getValue(); - HoodieCompactionOperation anotherOperation = - fgIdToPendingCompactionWithInstantMap.get(pair.getKey()).getValue(); - - if (!operation.equals(anotherOperation)) { - String msg = "Hudi File Id (" + pair.getKey() + ") has more than 1 pending compactions. Instants: " - + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey()); - throw new IllegalStateException(msg); - } - } - fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); - }); + pendingCompactionPlanWithInstants.stream().flatMap(instantPlanPair -> + getPendingCompactionOperations(instantPlanPair.getKey(), instantPlanPair.getValue())).forEach(pair -> { + // Defensive check to ensure a single-fileId does not have more than one pending compaction with different + // file slices. If we find a full duplicate we assume it is caused by eventual nature of the move operation + // on some DFSs. + if (fgIdToPendingCompactionWithInstantMap.containsKey(pair.getKey())) { + HoodieCompactionOperation operation = pair.getValue().getValue(); + HoodieCompactionOperation anotherOperation = fgIdToPendingCompactionWithInstantMap.get(pair.getKey()).getValue(); + + if (!operation.equals(anotherOperation)) { + String msg = "Hudi File Id (" + pair.getKey() + ") has more than 1 pending compactions. Instants: " + + pair.getValue() + ", " + fgIdToPendingCompactionWithInstantMap.get(pair.getKey()); + throw new IllegalStateException(msg); + } + } + fgIdToPendingCompactionWithInstantMap.put(pair.getKey(), pair.getValue()); + }); return fgIdToPendingCompactionWithInstantMap; } @@ -183,10 +181,8 @@ public static Stream ops = compactionPlan.getOperations(); if (null != ops) { - return ops.stream().map(op -> { - return Pair.of(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()), - Pair.of(instant.getTimestamp(), op)); - }); + return ops.stream().map(op -> Pair.of(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()), + Pair.of(instant.getTimestamp(), op))); } else { return Stream.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java index 152e3f7e73f78..8a5017af1d676 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConsistencyGuardConfig.java @@ -78,12 +78,9 @@ public static class Builder { private final Properties props = new Properties(); public Builder fromFile(File propertiesFile) throws IOException { - FileReader reader = new FileReader(propertiesFile); - try { + try (FileReader reader = new FileReader(propertiesFile)) { props.load(reader); return this; - } finally { - reader.close(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java index d9161e5129538..43b0030e58737 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java @@ -70,12 +70,7 @@ public class FSUtils { private static final long MIN_ROLLBACK_TO_KEEP = 10; private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_"; - private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() { - @Override - public boolean accept(Path file) { - return true; - } - }; + private static final PathFilter ALLOW_ALL_FILTER = file -> true; public static Configuration prepareHadoopConf(Configuration conf) { conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); @@ -93,7 +88,7 @@ public static Configuration prepareHadoopConf(Configuration conf) { public static FileSystem getFs(String path, Configuration conf) { FileSystem fs; - conf = prepareHadoopConf(conf); + prepareHadoopConf(conf); try { fs = new Path(path).getFileSystem(conf); } catch (IOException e) { @@ -226,8 +221,7 @@ static void processFiles(FileSystem fs, String basePathStr, Function { - if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) { - return false; - } - return true; - }; + return (path) -> !path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME); } public static String getInstantTime(String name) { @@ -396,17 +385,14 @@ public static String makeLogFileName(String fileId, String logFileExtension, Str public static boolean isLogFile(Path logPath) { Matcher matcher = LOG_FILE_PATTERN.matcher(logPath.getName()); - if (!matcher.find()) { - return false; - } - return true; + return matcher.find(); } /** * Get the latest log file written from the list of log files passed in. */ public static Option getLatestLogFile(Stream logFiles) { - return Option.fromJavaOptional(logFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).findFirst()); + return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator())); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java index b4a099179ec40..70ceed0f9f08e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java @@ -81,7 +81,7 @@ public void waitTillAllFilesDisappear(String dirPath, List files) throws public void waitForFilesVisibility(String dirPath, List files, FileVisibility event) throws TimeoutException { Path dir = new Path(dirPath); List filesWithoutSchemeAndAuthority = - files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(p -> p.toString()) + files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(Path::toString) .collect(Collectors.toList()); retryTillSuccess((retryNum) -> { @@ -89,7 +89,7 @@ public void waitForFilesVisibility(String dirPath, List files, FileVisib LOG.info("Trying " + retryNum); FileStatus[] entries = fs.listStatus(dir); List gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath())) - .map(p -> p.toString()).collect(Collectors.toList()); + .map(Path::toString).collect(Collectors.toList()); List candidateFiles = new ArrayList<>(filesWithoutSchemeAndAuthority); boolean altered = candidateFiles.removeAll(gotFiles); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java index d030ce810ff4f..02e48e3a4e6ac 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java @@ -43,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -84,7 +85,7 @@ public static byte[] avroToBytes(GenericRecord record) throws IOException { public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOException { BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get()); reuseDecoder.set(decoder); - GenericDatumReader reader = new GenericDatumReader(schema); + GenericDatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, decoder); } @@ -141,7 +142,7 @@ private static Schema initRecordKeySchema() { Schema.Field recordKeyField = new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance()); Schema recordKeySchema = Schema.createRecord("HoodieRecordKey", "", "", false); - recordKeySchema.setFields(Arrays.asList(recordKeyField)); + recordKeySchema.setFields(Collections.singletonList(recordKeyField)); return recordKeySchema; } @@ -166,9 +167,8 @@ public static GenericRecord addHoodieKeyToRecord(GenericRecord record, String re * @param newFieldNames Null Field names to be added */ public static Schema appendNullSchemaFields(Schema schema, List newFieldNames) { - List newFields = schema.getFields().stream().map(field -> { - return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()); - }).collect(Collectors.toList()); + List newFields = schema.getFields().stream() + .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue())).collect(Collectors.toList()); for (String newField : newFieldNames) { newFields.add(new Schema.Field(newField, METADATA_FIELD_SCHEMA, "", NullNode.getInstance())); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java index 649396d362222..52b30cbc24e12 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/LogReaderUtils.java @@ -49,7 +49,7 @@ private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActive HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { HoodieLogBlock block = reader.prev(); - if (block instanceof HoodieAvroDataBlock && block != null) { + if (block instanceof HoodieAvroDataBlock) { HoodieAvroDataBlock lastBlock = (HoodieAvroDataBlock) block; if (completedTimeline .containsOrBeforeTimelineStarts(lastBlock.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME))) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java index b11ac6c1d1491..d39600044f80f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java @@ -132,7 +132,7 @@ public ClassSizeInfo load(Class clazz) { private final Set alreadyVisited = Sets.newIdentityHashSet(); - private final Deque pending = new ArrayDeque(16 * 1024); + private final Deque pending = new ArrayDeque<>(16 * 1024); private long size; /** @@ -268,7 +268,7 @@ private class ClassSizeInfo { public ClassSizeInfo(Class clazz) { long fieldsSize = 0; - final List referenceFields = new LinkedList(); + final List referenceFields = new LinkedList<>(); for (Field f : clazz.getDeclaredFields()) { if (Modifier.isStatic(f.getModifiers())) { continue; @@ -303,9 +303,7 @@ public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) { try { calc.enqueue(f.get(obj)); } catch (IllegalAccessException e) { - final AssertionError ae = new AssertionError("Unexpected denial of access to " + f); - ae.initCause(e); - throw ae; + throw new AssertionError("Unexpected denial of access to " + f, e); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java index 59af74bf2e5a2..0115ec2b92d13 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.rocksdb.AbstractImmutableNativeReference; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -61,13 +62,11 @@ public class RocksDBDAO { private transient ConcurrentHashMap managedDescriptorMap; private transient RocksDB rocksDB; private boolean closed = false; - private final String basePath; private final String rocksDBBasePath; public RocksDBDAO(String basePath, String rocksDBBasePath) { - this.basePath = basePath; this.rocksDBBasePath = - String.format("%s/%s/%s", rocksDBBasePath, this.basePath.replace("/", "_"), UUID.randomUUID().toString()); + String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID().toString()); init(); } @@ -153,14 +152,11 @@ private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFam * Perform a batch write operation. */ public void writeBatch(BatchHandler handler) { - WriteBatch batch = new WriteBatch(); - try { + try (WriteBatch batch = new WriteBatch()) { handler.apply(batch); getRocksDB().write(new WriteOptions(), batch); } catch (RocksDBException re) { throw new HoodieException(re); - } finally { - batch.close(); } } @@ -442,9 +438,7 @@ public void dropColumnFamily(String columnFamilyName) { public synchronized void close() { if (!closed) { closed = true; - managedHandlesMap.values().forEach(columnFamilyHandle -> { - columnFamilyHandle.close(); - }); + managedHandlesMap.values().forEach(AbstractImmutableNativeReference::close); managedHandlesMap.clear(); managedDescriptorMap.clear(); getRocksDB().close(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java index 1c17e83346e4c..9096080bb0f6e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java @@ -41,7 +41,7 @@ public class SerializationUtils { // Caching kryo serializer to avoid creating kryo instance for every serde operation private static final ThreadLocal SERIALIZER_REF = - ThreadLocal.withInitial(() -> new KryoSerializerInstance()); + ThreadLocal.withInitial(KryoSerializerInstance::new); // Serialize // ----------------------------------------------------------------------- @@ -99,7 +99,7 @@ private static class KryoSerializerInstance implements Serializable { kryo.setRegistrationRequired(false); } - byte[] serialize(Object obj) throws IOException { + byte[] serialize(Object obj) { kryo.reset(); baos.reset(); Output output = new Output(baos); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java index fd211e0f7cffc..38dcaaaf3fe26 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBBasedMap.java @@ -84,11 +84,7 @@ public R remove(Object key) { @Override public void putAll(Map m) { - getRocksDBDAO().writeBatch(batch -> { - m.entrySet().forEach(entry -> { - getRocksDBDAO().putInBatch(batch, columnFamilyName, entry.getKey(), entry.getValue()); - }); - }); + getRocksDBDAO().writeBatch(batch -> m.forEach((key, value) -> getRocksDBDAO().putInBatch(batch, columnFamilyName, key, value))); } private RocksDBDAO getRocksDBDAO() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java index 2c5ce5d9782a1..7ea6d5e1b197e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java @@ -73,7 +73,7 @@ public class BoundedInMemoryQueue implements Iterable { // it holds the root cause of the exception in case either queueing records (consuming from // inputIterator) fails or // thread reading records from queue fails. - private final AtomicReference hasFailed = new AtomicReference(null); + private final AtomicReference hasFailed = new AtomicReference<>(null); // used for indicating that all the records from queue are read successfully. private final AtomicBoolean isReadDone = new AtomicBoolean(false); // used for indicating that all records have been enqueued @@ -222,7 +222,7 @@ private Option readNextRecord() { /** * Puts an empty entry to queue to denote termination. */ - public void close() throws InterruptedException { + public void close() { // done queueing records notifying queue-reader. isWriteDone.set(true); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java index 6f27dbce6c5d3..71890b19640f9 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/HoodieTestUtils.java @@ -70,6 +70,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -125,7 +126,7 @@ public static String makeNewCommitTime() { return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()); } - public static final void createCommitFiles(String basePath, String... commitTimes) throws IOException { + public static void createCommitFiles(String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" @@ -139,11 +140,11 @@ public static final void createCommitFiles(String basePath, String... commitTime } } - public static final void createMetadataFolder(String basePath) { + public static void createMetadataFolder(String basePath) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME).mkdirs(); } - public static final void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException { + public static void createInflightCommitFiles(String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { new File(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" @@ -153,7 +154,7 @@ public static final void createInflightCommitFiles(String basePath, String... co } } - public static final void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... commitTimes) { + public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... commitTimes) { for (String commitTime : commitTimes) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(commitTime), HoodieTimeline.makeInflightCleanerFileName(commitTime)).forEach(f -> { @@ -180,19 +181,19 @@ public static final void createPendingCleanFiles(HoodieTableMetaClient metaClien } } - public static final String createNewDataFile(String basePath, String partitionPath, String commitTime) + public static String createNewDataFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString(); return createDataFile(basePath, partitionPath, commitTime, fileID); } - public static final String createNewMarkerFile(String basePath, String partitionPath, String commitTime) + public static String createNewMarkerFile(String basePath, String partitionPath, String commitTime) throws IOException { String fileID = UUID.randomUUID().toString(); return createMarkerFile(basePath, partitionPath, commitTime, fileID); } - public static final String createDataFile(String basePath, String partitionPath, String commitTime, String fileID) + public static String createDataFile(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; new File(folderPath).mkdirs(); @@ -200,7 +201,7 @@ public static final String createDataFile(String basePath, String partitionPath, return fileID; } - public static final String createMarkerFile(String basePath, String partitionPath, String commitTime, String fileID) + public static String createMarkerFile(String basePath, String partitionPath, String commitTime, String fileID) throws IOException { String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + commitTime + "/" + partitionPath + "/"; @@ -210,7 +211,7 @@ public static final String createMarkerFile(String basePath, String partitionPat return f.getAbsolutePath(); } - public static final String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime, + public static String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime, String fileID, Option version) throws IOException { String folderPath = basePath + "/" + partitionPath + "/"; boolean makeDir = fs.mkdirs(new Path(folderPath)); @@ -226,7 +227,7 @@ public static final String createNewLogFile(FileSystem fs, String basePath, Stri return fileID; } - public static final void createCompactionCommitFiles(FileSystem fs, String basePath, String... commitTimes) + public static void createCompactionCommitFiles(FileSystem fs, String basePath, String... commitTimes) throws IOException { for (String commitTime : commitTimes) { boolean createFile = fs.createNewFile(new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" @@ -237,7 +238,7 @@ public static final void createCompactionCommitFiles(FileSystem fs, String baseP } } - public static final void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, + public static void createCompactionRequest(HoodieTableMetaClient metaClient, String instant, List> fileSliceList) throws IOException { HoodieCompactionPlan plan = CompactionUtils.buildFromFileSlices(fileSliceList, Option.empty(), Option.empty()); HoodieInstant compactionInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, instant); @@ -245,47 +246,47 @@ public static final void createCompactionRequest(HoodieTableMetaClient metaClien AvroUtils.serializeCompactionPlan(plan)); } - public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) { + public static String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) { return basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, DEFAULT_WRITE_TOKEN, fileID); } - public static final String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID, + public static String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID, Option version) { return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_LOG_VERSION), HoodieLogFormat.UNKNOWN_WRITE_TOKEN); } - public static final String getCommitFilePath(String basePath, String commitTime) { + public static String getCommitFilePath(String basePath, String commitTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION; } - public static final String getInflightCommitFilePath(String basePath, String commitTime) { + public static String getInflightCommitFilePath(String basePath, String commitTime) { return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } - public static final String getRequestedCompactionFilePath(String basePath, String commitTime) { + public static String getRequestedCompactionFilePath(String basePath, String commitTime) { return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + commitTime + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION; } - public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, + public static boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, String fileID) { return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists(); } - public static final boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID, + public static boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID, Option version) { return new File(getLogFilePath(basePath, partitionPath, commitTime, fileID, version)).exists(); } - public static final boolean doesCommitExist(String basePath, String commitTime) { + public static boolean doesCommitExist(String basePath, String commitTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION) .exists(); } - public static final boolean doesInflightExist(String basePath, String commitTime) { + public static boolean doesInflightExist(String basePath, String commitTime) { return new File( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.INFLIGHT_EXTENSION) .exists(); @@ -298,19 +299,16 @@ public static void createCleanFiles(HoodieTableMetaClient metaClient, String bas Path commitFile = new Path( basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(commitTime)); FileSystem fs = FSUtils.getFs(basePath, configuration); - FSDataOutputStream os = fs.create(commitFile, true); - try { + try (FSDataOutputStream os = fs.create(commitFile, true)) { HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), commitTime); // Create the clean metadata HoodieCleanMetadata cleanMetadata = - CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Arrays.asList(cleanStats)); + CleanerUtils.convertCleanMetadata(metaClient, commitTime, Option.of(0L), Collections.singletonList(cleanStats)); // Write empty clean metadata os.write(AvroUtils.serializeCleanMetadata(cleanMetadata).get()); - } finally { - os.close(); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index ab1d95ed88bf8..68646231efabc 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; -import com.google.common.collect.Lists; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -34,6 +33,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.Arrays; import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; @@ -120,7 +120,7 @@ public void checkArchiveCommitTimeline() throws IOException { HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2"); HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3"); - assertEquals(Lists.newArrayList(instant1, instant2, instant3), + assertEquals(Arrays.asList(instant1, instant2, instant3), archivedTimeline.getInstants().collect(Collectors.toList())); assertArrayEquals(new Text("data1").getBytes(), archivedTimeline.getInstantDetails(instant1).get()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java index 652cca92680f5..1b9667c1e7cbc 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java @@ -86,7 +86,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { private FileSystem fs; private Path partitionPath; private int bufferSize = 4096; - private Boolean readBlocksLazily = true; + private Boolean readBlocksLazily; public TestHoodieLogFormat(Boolean readBlocksLazily) { this.readBlocksLazily = readBlocksLazily; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index fc05e2f015907..2b8f04fff9225 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -209,7 +209,7 @@ public void testViewForFileSlicesWithBaseFileAndInflightCompaction() throws Exce * @return */ private Stream getAllRawFileSlices(String partitionPath) { - return fsView.getAllFileGroups(partitionPath).map(group -> group.getAllFileSlicesIncludingInflight()) + return fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getAllFileSlicesIncludingInflight) .flatMap(sliceList -> sliceList); } @@ -220,7 +220,7 @@ private Stream getAllRawFileSlices(String partitionPath) { * @return */ public Stream getLatestRawFileSlices(String partitionPath) { - return fsView.getAllFileGroups(partitionPath).map(fileGroup -> fileGroup.getLatestFileSlicesIncludingInflight()) + return fsView.getAllFileGroups(partitionPath).map(HoodieFileGroup::getLatestFileSlicesIncludingInflight) .filter(fileSliceOpt -> fileSliceOpt.isPresent()).map(Option::get); } @@ -275,7 +275,7 @@ protected void testViewForFileSlicesWithAsyncCompaction(boolean skipCreatingData partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0))); HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs, Option.empty(), Option.empty()); - HoodieInstant compactionInstant = null; + HoodieInstant compactionInstant; if (isCompactionInFlight) { // Create a Data-file but this should be skipped by view new File(basePath + "/" + partitionPath + "/" + compactDataFileName).createNewFile(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 7a4efccb621a1..e38625a3a4cbf 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -62,6 +62,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -115,13 +116,13 @@ public void testAsyncCompaction() throws IOException { unscheduleCompaction(view, "14", "13", "11"); // Add one more delta instant - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("15"), true, "11")); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("15"), true, "11")); // Schedule Compaction again scheduleCompaction(view, "16"); // Run Compaction - This will be the second file-slice - testMultipleWriteSteps(view, Arrays.asList("16"), false, "16", 2); + testMultipleWriteSteps(view, Collections.singletonList("16"), false, "16", 2); // Run 2 more ingest instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("17", "18"), true, "16", 2)); @@ -130,25 +131,25 @@ public void testAsyncCompaction() throws IOException { scheduleCompaction(view, "19"); // Run one more ingestion after pending compaction. THis will be 3rd slice - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("20"), true, "19", 3)); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("20"), true, "19", 3)); // Clean first slice - testCleans(view, Arrays.asList("21"), + testCleans(view, Collections.singletonList("21"), new ImmutableMap.Builder>().put("11", Arrays.asList("12", "13", "15")).build(), - instantsToFiles, Arrays.asList("11")); + instantsToFiles, Collections.singletonList("11")); // Add one more ingestion instant. This should be 2nd slice now - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("22"), true, "19", 2)); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("22"), true, "19", 2)); // Restore last ingestion - testRestore(view, Arrays.asList("23"), true, new HashMap<>(), Arrays.asList("22"), "24", false); + testRestore(view, Collections.singletonList("23"), true, new HashMap<>(), Collections.singletonList("22"), "24", false); // Run one more ingestion. THis is still 2nd slice - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("24"), true, "19", 2)); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("24"), true, "19", 2)); // Finish Compaction - instantsToFiles.putAll(testMultipleWriteSteps(view, Arrays.asList("19"), false, "19", 2, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "24")))); + instantsToFiles.putAll(testMultipleWriteSteps(view, Collections.singletonList("19"), false, "19", 2, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "24")))); } @Test @@ -198,13 +199,13 @@ public void testMultipleTransitions() throws IOException { SyncableFileSystemView view1 = getFileSystemView(metaClient); view1.sync(); - Map> instantsToFiles = null; + Map> instantsToFiles; /** * Case where incremental syncing is catching up on more than one ingestion at a time */ // Run 1 ingestion on MOR table (1 delta commits). View1 is now sync up to this point - instantsToFiles = testMultipleWriteSteps(view1, Arrays.asList("11"), true, "11"); + instantsToFiles = testMultipleWriteSteps(view1, Collections.singletonList("11"), true, "11"); SyncableFileSystemView view2 = getFileSystemView(new HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath())); @@ -213,7 +214,7 @@ public void testMultipleTransitions() throws IOException { instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", "13"), true, "11")); // Now Sync view1 and add 1 more ingestion. Check if view1 is able to catchup correctly - instantsToFiles.putAll(testMultipleWriteSteps(view1, Arrays.asList("14"), true, "11")); + instantsToFiles.putAll(testMultipleWriteSteps(view1, Collections.singletonList("14"), true, "11")); view2.sync(); SyncableFileSystemView view3 = @@ -238,8 +239,8 @@ public void testMultipleTransitions() throws IOException { scheduleCompaction(view2, "16"); instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("17", "18"), true, "16", 2)); // Compaction - testMultipleWriteSteps(view2, Arrays.asList("16"), false, "16", 2, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "18"))); + testMultipleWriteSteps(view2, Collections.singletonList("16"), false, "16", 2, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "18"))); view1.sync(); areViewsConsistent(view1, view2, partitions.size() * fileIdsPerPartition.size() * 2); SyncableFileSystemView view5 = @@ -249,14 +250,14 @@ public void testMultipleTransitions() throws IOException { /** * Case where a clean happened and then rounds of ingestion and compaction happened */ - testCleans(view2, Arrays.asList("19"), + testCleans(view2, Collections.singletonList("19"), new ImmutableMap.Builder>().put("11", Arrays.asList("12", "13", "14")).build(), - instantsToFiles, Arrays.asList("11")); + instantsToFiles, Collections.singletonList("11")); scheduleCompaction(view2, "20"); instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("21", "22"), true, "20", 2)); // Compaction - testMultipleWriteSteps(view2, Arrays.asList("20"), false, "20", 2, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "22"))); + testMultipleWriteSteps(view2, Collections.singletonList("20"), false, "20", 2, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "22"))); // Run one more round of ingestion instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("23", "24"), true, "20", 2)); view1.sync(); @@ -268,14 +269,14 @@ public void testMultipleTransitions() throws IOException { /** * Case where multiple restores and ingestions happened */ - testRestore(view2, Arrays.asList("25"), true, new HashMap<>(), Arrays.asList("24"), "29", true); - testRestore(view2, Arrays.asList("26"), true, new HashMap<>(), Arrays.asList("23"), "29", false); - instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("27"), true, "20", 2)); + testRestore(view2, Collections.singletonList("25"), true, new HashMap<>(), Collections.singletonList("24"), "29", true); + testRestore(view2, Collections.singletonList("26"), true, new HashMap<>(), Collections.singletonList("23"), "29", false); + instantsToFiles.putAll(testMultipleWriteSteps(view2, Collections.singletonList("27"), true, "20", 2)); scheduleCompaction(view2, "28"); - instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("29"), true, "28", 3)); + instantsToFiles.putAll(testMultipleWriteSteps(view2, Collections.singletonList("29"), true, "28", 3)); // Compaction - testMultipleWriteSteps(view2, Arrays.asList("28"), false, "28", 3, - Arrays.asList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "29"))); + testMultipleWriteSteps(view2, Collections.singletonList("28"), false, "28", 3, + Collections.singletonList(new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "29"))); Arrays.asList(view1, view2, view3, view4, view5, view6).forEach(v -> { v.sync(); @@ -371,8 +372,7 @@ private void testRestore(SyncableFileSystemView view, List newRestoreIns LOG.info("Last Instant is :" + view.getLastInstant().get()); if (isRestore) { Assert.assertEquals(newRestoreInstants.get(idx), view.getLastInstant().get().getTimestamp()); - Assert.assertEquals(isRestore ? HoodieTimeline.RESTORE_ACTION : HoodieTimeline.ROLLBACK_ACTION, - view.getLastInstant().get().getAction()); + Assert.assertEquals(HoodieTimeline.RESTORE_ACTION, view.getLastInstant().get().getAction()); } Assert.assertEquals(State.COMPLETED, view.getLastInstant().get().getState()); @@ -532,9 +532,7 @@ private void unscheduleCompaction(SyncableFileSystemView view, String compaction view.sync(); Assert.assertEquals(newLastInstant, view.getLastInstant().get().getTimestamp()); - partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> { - Assert.assertEquals(newBaseInstant, fs.getBaseInstantTime()); - })); + partitions.forEach(p -> view.getLatestFileSlices(p).forEach(fs -> Assert.assertEquals(newBaseInstant, fs.getBaseInstantTime()))); } /** @@ -618,17 +616,11 @@ private Map> testMultipleWriteSteps(SyncableFileSystemView final long expTotalFileSlicesPerPartition = fileIdsPerPartition.size() * multiple; partitions.forEach(p -> Assert.assertEquals(expTotalFileSlicesPerPartition, view.getAllFileSlices(p).count())); if (deltaCommit) { - partitions.forEach(p -> { - view.getLatestFileSlices(p).forEach(f -> { - Assert.assertEquals(baseInstantForDeltaCommit, f.getBaseInstantTime()); - }); - }); + partitions.forEach(p -> + view.getLatestFileSlices(p).forEach(f -> Assert.assertEquals(baseInstantForDeltaCommit, f.getBaseInstantTime())) + ); } else { - partitions.forEach(p -> { - view.getLatestBaseFiles(p).forEach(f -> { - Assert.assertEquals(instant, f.getCommitTime()); - }); - }); + partitions.forEach(p -> view.getLatestBaseFiles(p).forEach(f -> Assert.assertEquals(instant, f.getCommitTime()))); } metaClient.reloadActiveTimeline(); @@ -704,23 +696,21 @@ private void areViewsConsistent(SyncableFileSystemView view1, SyncableFileSystem private List addInstant(HoodieTableMetaClient metaClient, String instant, boolean deltaCommit, String baseInstant) throws IOException { - List> writeStats = partitions.stream().flatMap(p -> { - return fileIdsPerPartition.stream().map(f -> { - try { - File file = new File(basePath + "/" + p + "/" - + (deltaCommit - ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant), TEST_WRITE_TOKEN) - : FSUtils.makeDataFileName(instant, TEST_WRITE_TOKEN, f))); - file.createNewFile(); - HoodieWriteStat w = new HoodieWriteStat(); - w.setFileId(f); - w.setPath(String.format("%s/%s", p, file.getName())); - return Pair.of(p, w); - } catch (IOException e) { - throw new HoodieException(e); - } - }); - }).collect(Collectors.toList()); + List> writeStats = partitions.stream().flatMap(p -> fileIdsPerPartition.stream().map(f -> { + try { + File file = new File(basePath + "/" + p + "/" + + (deltaCommit + ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant), TEST_WRITE_TOKEN) + : FSUtils.makeDataFileName(instant, TEST_WRITE_TOKEN, f))); + file.createNewFile(); + HoodieWriteStat w = new HoodieWriteStat(); + w.setFileId(f); + w.setPath(String.format("%s/%s", p, file.getName())); + return Pair.of(p, w); + } catch (IOException e) { + throw new HoodieException(e); + } + })).collect(Collectors.toList()); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); writeStats.forEach(e -> metadata.addWriteStat(e.getKey(), e.getValue())); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java index b919d4236cbf6..3b010ca6b3c21 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestFSUtils.java @@ -229,8 +229,7 @@ public void testOldLogFilesComparison() { String log1Ver1 = makeOldLogFileName("file1", ".log", "1", 1); String log1base2 = makeOldLogFileName("file1", ".log", "2", 0); List logFiles = Stream.of(log1base2, log1Ver1, log1Ver0).map(HoodieLogFile::new) - .collect(Collectors.toList()); - logFiles.sort(HoodieLogFile.getLogFileComparator()); + .sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); assertEquals(log1Ver0, logFiles.get(0).getFileName()); assertEquals(log1Ver1, logFiles.get(1).getFileName()); assertEquals(log1base2, logFiles.get(2).getFileName()); @@ -250,8 +249,7 @@ public void testLogFilesComparison() { List logFiles = Stream.of(log1Ver1W1, log1base2W0, log1base2W1, log1Ver1W0, log1Ver0W1, log1Ver0W0) - .map(HoodieLogFile::new).collect(Collectors.toList()); - logFiles.sort(HoodieLogFile.getLogFileComparator()); + .map(HoodieLogFile::new).sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList()); assertEquals(log1Ver0W0, logFiles.get(0).getFileName()); assertEquals(log1Ver0W1, logFiles.get(1).getFileName()); assertEquals(log1Ver1W0, logFiles.get(2).getFileName()); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java index ba8e042134767..f65e4f11efb4b 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -77,8 +78,7 @@ public static List getIncrementalTableNames(JobContext job) { return (!matcher.find() ? null : matcher.group(1)); } return null; - }).filter(s -> s != null) - .collect(Collectors.toList()); + }).filter(Objects::nonNull).collect(Collectors.toList()); if (result == null) { // Returns an empty list instead of null. result = new ArrayList<>(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java index e7eb99014adac..eaf28fc0fb55a 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -48,6 +49,7 @@ import org.apache.hudi.common.table.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -93,9 +95,7 @@ public FileStatus[] listStatus(JobConf job) throws IOException { if (nonHoodiePaths.size() > 0) { setInputPaths(job, nonHoodiePaths.toArray(new Path[nonHoodiePaths.size()])); FileStatus[] fileStatuses = super.listStatus(job); - for (FileStatus fileStatus: fileStatuses) { - returns.add(fileStatus); - } + returns.addAll(Arrays.asList(fileStatuses)); } // process snapshot queries next. @@ -133,8 +133,7 @@ private List listStatusForIncrementalMode( .getInstants().collect(Collectors.toList()); // Extract partitions touched by the commitsToCheck Set partitionsToList = new HashSet<>(); - for (int i = 0; i < commitsToCheck.size(); i++) { - HoodieInstant commit = commitsToCheck.get(i); + for (HoodieInstant commit : commitsToCheck) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); partitionsToList.addAll(commitMetadata.getPartitionToWriteStats().keySet()); @@ -171,15 +170,14 @@ private List listStatusForIncrementalMode( return false; }) .collect(Collectors.joining(",")); - if (incrementalInputPaths == null || incrementalInputPaths.isEmpty()) { + if (StringUtils.isNullOrEmpty(incrementalInputPaths)) { return null; } // Mutate the JobConf to set the input paths to only partitions touched by incremental pull. setInputPaths(job, incrementalInputPaths); FileStatus[] fileStatuses = super.listStatus(job); - BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, - fileStatuses); - List commitsList = commitsToCheck.stream().map(s -> s.getTimestamp()).collect(Collectors.toList()); + BaseFileOnlyView roView = new HoodieTableFileSystemView(tableMetaClient, timeline, fileStatuses); + List commitsList = commitsToCheck.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList()); List filteredFiles = roView.getLatestBaseFilesInRange(commitsList).collect(Collectors.toList()); List returns = new ArrayList<>(); for (HoodieBaseFile filteredFile : filteredFiles) { @@ -200,7 +198,7 @@ private List listStatusForIncrementalMode( * @throws IOException */ private Map> groupFileStatusForSnapshotPaths( - FileStatus[] fileStatuses, Collection metaClientList) throws IOException { + FileStatus[] fileStatuses, Collection metaClientList) { // This assumes the paths for different tables are grouped together Map> grouped = new HashMap<>(); HoodieTableMetaClient metadata = null; @@ -231,8 +229,8 @@ private Map> groupFileStatusForSnapshotP * Filters data files for a snapshot queried table. */ private List filterFileStatusForSnapshotMode( - HoodieTableMetaClient metadata, List fileStatuses) throws IOException { - FileStatus[] statuses = fileStatuses.toArray(new FileStatus[fileStatuses.size()]); + HoodieTableMetaClient metadata, List fileStatuses) { + FileStatus[] statuses = fileStatuses.toArray(new FileStatus[0]); if (LOG.isDebugEnabled()) { LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata); } @@ -258,7 +256,7 @@ private List filterFileStatusForSnapshotMode( * super.listStatus() and gets back a FileStatus[] 2. Then it creates the HoodieTableMetaClient for the paths listed. * 3. Generation of splits looks at FileStatus size to create splits, which skips this file */ - private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) throws IOException { + private HoodieBaseFile checkFileStatus(HoodieBaseFile dataFile) { Path dataPath = dataFile.getFileStatus().getPath(); try { if (dataFile.getFileSize() == 0) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index cd1cea32eef88..506b6cfe2d08c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -65,11 +65,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -115,7 +116,7 @@ public CheckNonCombinablePathCallable(Path[] paths, int start, int length, JobCo @Override public Set call() throws Exception { - Set nonCombinablePathIndices = new HashSet(); + Set nonCombinablePathIndices = new HashSet<>(); for (int i = 0; i < length; i++) { PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, paths[i + start], IOPrepareCache.get().allocatePartitionDescMap()); @@ -356,25 +357,21 @@ private InputSplit[] getCombineSplits(JobConf job, int numSplits, Map result = new ArrayList(); + ArrayList result = new ArrayList<>(); // combine splits only from same tables and same partitions. Do not combine splits from multiple // tables or multiple partitions. Path[] paths = StringInternUtils.internUriStringsInPathArray(combine.getInputPathsShim(job)); - List inpDirs = new ArrayList(); - List inpFiles = new ArrayList(); - Map poolMap = new HashMap(); - Set poolSet = new HashSet(); + List inpDirs = new ArrayList<>(); + List inpFiles = new ArrayList<>(); + Map poolMap = new HashMap<>(); + Set poolSet = new HashSet<>(); for (Path path : paths) { PartitionDesc part = HiveFileFormatUtils.getPartitionDescFromPathRecursively(pathToPartitionInfo, path, @@ -414,8 +411,8 @@ private InputSplit[] getCombineSplits(JobConf job, int numSplits, Map> opList = null; + CombineFilter f; + List> opList; if (!mrwork.isMapperCannotSpanPartns()) { // if mapper can span partitions, make sure a splits does not contain multiple @@ -441,7 +438,7 @@ private InputSplit[] getCombineSplits(JobConf job, int numSplits, Map iss = new ArrayList(); + List iss = new ArrayList<>(); if (!mrwork.isMapperCannotSpanPartns()) { // mapper can span partitions // combine into as few as one split, subject to the PathFilters set @@ -496,14 +493,14 @@ public Set getNonCombinablePathIndices(JobConf job, Path[] paths, int n int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads); ExecutorService executor = Executors.newFixedThreadPool(numThreads); - List>> futureList = new ArrayList>>(numThreads); + List>> futureList = new ArrayList<>(numThreads); try { for (int i = 0; i < numThreads; i++) { int start = i * numPathPerThread; int length = i != numThreads - 1 ? numPathPerThread : paths.length - start; futureList.add(executor.submit(new CheckNonCombinablePathCallable(paths, start, length, job))); } - Set nonCombinablePathIndices = new HashSet(); + Set nonCombinablePathIndices = new HashSet<>(); for (Future> future : futureList) { nonCombinablePathIndices.addAll(future.get()); } @@ -522,12 +519,12 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); init(job); - ArrayList result = new ArrayList(); + List result = new ArrayList<>(); Path[] paths = getInputPaths(job); - List nonCombinablePaths = new ArrayList(paths.length / 2); - List combinablePaths = new ArrayList(paths.length / 2); + List nonCombinablePaths = new ArrayList<>(paths.length / 2); + List combinablePaths = new ArrayList<>(paths.length / 2); int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); @@ -561,22 +558,18 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // Process the normal splits if (nonCombinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new Path[nonCombinablePaths.size()])); + FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new Path[0])); InputSplit[] splits = super.getSplits(job, numSplits); - for (InputSplit split : splits) { - result.add(split); - } + Collections.addAll(result, splits); } // Process the combine splits if (combinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[combinablePaths.size()])); + FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0])); Map pathToPartitionInfo = this.pathToPartitionInfo != null ? this.pathToPartitionInfo : Utilities.getMapWork(job).getPathToPartitionInfo(); InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); - for (InputSplit split : splits) { - result.add(split); - } + Collections.addAll(result, splits); } // Restore the old path information back @@ -634,8 +627,8 @@ Path[] getInputPaths(JobConf job) throws IOException { */ private List sampleSplits(List splits) { HashMap nameToSamples = mrwork.getNameToSplitSample(); - List retLists = new ArrayList(); - Map> aliasToSplitList = new HashMap>(); + List retLists = new ArrayList<>(); + Map> aliasToSplitList = new HashMap<>(); Map> pathToAliases = mrwork.getPathToAliases(); Map> pathToAliasesNoScheme = removeScheme(pathToAliases); @@ -651,7 +644,7 @@ private List sampleSplits(List splits) { // 1. it serves more than one alias // 2. the alias it serves is not sampled // 3. it serves different alias than another path for the same split - if (l.size() != 1 || !nameToSamples.containsKey(l.get(0)) || (alias != null && l.get(0) != alias)) { + if (l.size() != 1 || !nameToSamples.containsKey(l.get(0)) || (alias != null && !Objects.equals(l.get(0), alias))) { alias = null; break; } @@ -662,7 +655,7 @@ private List sampleSplits(List splits) { // split exclusively serves alias, which needs to be sampled // add it to the split list of the alias. if (!aliasToSplitList.containsKey(alias)) { - aliasToSplitList.put(alias, new ArrayList()); + aliasToSplitList.put(alias, new ArrayList<>()); } aliasToSplitList.get(alias).add(split); } else { @@ -727,7 +720,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter repo CombineHiveInputSplit hsplit = (CombineHiveInputSplit) split; String inputFormatClassName = null; - Class inputFormatClass = null; + Class inputFormatClass; try { inputFormatClassName = hsplit.inputFormatClassName(); inputFormatClass = job.getClassByName(inputFormatClassName); @@ -743,7 +736,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter repo static class CombineFilter implements PathFilter { - private final Set pStrings = new HashSet(); + private final Set pStrings = new HashSet<>(); // store a path prefix in this TestFilter // PRECONDITION: p should always be a directory @@ -764,7 +757,7 @@ public void addPath(Path p) { @Override public boolean accept(Path path) { boolean find = false; - while (path != null && !find) { + while (path != null) { if (pStrings.contains(path.toUri().getPath())) { find = true; break; @@ -838,19 +831,12 @@ protected List listStatus(JobContext job) throws IOException { input = new HoodieParquetInputFormat(); } input.setConf(job.getConfiguration()); - result = new ArrayList(Arrays.asList(input.listStatus(new JobConf(job.getConfiguration())))); + result = new ArrayList<>(Arrays.asList(input.listStatus(new JobConf(job.getConfiguration())))); } else { result = super.listStatus(job); } - Iterator it = result.iterator(); - - while (it.hasNext()) { - FileStatus stat = (FileStatus) it.next(); - if (!stat.isFile()) { - it.remove(); - } - } + result.removeIf(stat -> !stat.isFile()); return result; } @@ -870,12 +856,12 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti } InputSplit[] splits = super.getSplits(job, numSplits); - ArrayList inputSplitShims = new ArrayList(); + List inputSplitShims = new ArrayList<>(); - for (int pos = 0; pos < splits.length; ++pos) { - CombineFileSplit split = (CombineFileSplit) splits[pos]; + for (InputSplit inputSplit : splits) { + CombineFileSplit split = (CombineFileSplit) inputSplit; if (split.getPaths().length > 0) { - inputSplitShims.add(new HadoopShimsSecure.InputSplitShim(job, split.getPaths(), split.getStartOffsets(), + inputSplitShims.add(new InputSplitShim(job, split.getPaths(), split.getStartOffsets(), split.getLengths(), split.getLocations())); } } @@ -884,7 +870,7 @@ public CombineFileSplit[] getSplits(JobConf job, int numSplits) throws IOExcepti } @Override - public HadoopShimsSecure.InputSplitShim getInputSplitShim() throws IOException { + public HadoopShimsSecure.InputSplitShim getInputSplitShim() { return new HadoopShimsSecure.InputSplitShim(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index 1bfc446e0e11f..df635d6184a20 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -78,7 +78,7 @@ public abstract class AbstractRealtimeRecordReader { // Property to set the max memory for dfs inputstream buffer size public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = "hoodie.memory.dfs.buffer.max.size"; // Setting this to lower value of 1 MB since no control over how many RecordReaders will be started in a mapper - public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1 * 1024 * 1024; // 1 MB + public static final int DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE = 1024 * 1024; // 1 MB // Property to set file path prefix for spillable file public static final String SPILLABLE_MAP_BASE_PATH_PROP = "hoodie.memory.spillable.map.path"; // Default file path prefix for spillable file @@ -170,18 +170,12 @@ private static List orderFields(String fieldNameCsv, String fieldOrderCs // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L188} // Field Names -> {@link https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/serde/src/java // /org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java#L229} - Set fieldOrdersSet = new LinkedHashSet<>(); String[] fieldOrdersWithDups = fieldOrderCsv.split(","); - for (String fieldOrder : fieldOrdersWithDups) { - fieldOrdersSet.add(fieldOrder); - } - String[] fieldOrders = fieldOrdersSet.toArray(new String[fieldOrdersSet.size()]); + Set fieldOrdersSet = new LinkedHashSet<>(Arrays.asList(fieldOrdersWithDups)); + String[] fieldOrders = fieldOrdersSet.toArray(new String[0]); List fieldNames = Arrays.stream(fieldNameCsv.split(",")) .filter(fn -> !partitioningFields.contains(fn)).collect(Collectors.toList()); - Set fieldNamesSet = new LinkedHashSet<>(); - for (String fieldName : fieldNames) { - fieldNamesSet.add(fieldName); - } + Set fieldNamesSet = new LinkedHashSet<>(fieldNames); // Hive does not provide ids for partitioning fields, so check for lengths excluding that. if (fieldNamesSet.size() != fieldOrders.length) { throw new HoodieException(String @@ -189,7 +183,7 @@ private static List orderFields(String fieldNameCsv, String fieldOrderCs fieldNames.size(), fieldOrders.length)); } TreeMap orderedFieldMap = new TreeMap<>(); - String[] fieldNamesArray = fieldNamesSet.toArray(new String[fieldNamesSet.size()]); + String[] fieldNamesArray = fieldNamesSet.toArray(new String[0]); for (int ox = 0; ox < fieldOrders.length; ox++) { orderedFieldMap.put(Integer.parseInt(fieldOrders[ox]), fieldNamesArray[ox]); } @@ -402,7 +396,7 @@ public Schema getHiveSchema() { public long getMaxCompactionMemoryInBytes() { // jobConf.getMemoryForMapTask() returns in MB return (long) Math - .ceil(Double.valueOf(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) + .ceil(Double.parseDouble(jobConf.get(COMPACTION_MEMORY_FRACTION_PROP, DEFAULT_COMPACTION_MEMORY_FRACTION)) * jobConf.getMemoryForMapTask() * 1024 * 1024L); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index 35d969e8de2b6..bc717a91e93a4 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -108,7 +108,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // for all unique split parents, obtain all delta files based on delta commit timeline, // grouped on file id List rtSplits = new ArrayList<>(); - partitionsToParquetSplits.keySet().stream().forEach(partitionPath -> { + partitionsToParquetSplits.keySet().forEach(partitionPath -> { // for each partition path obtain the data & log file groupings, then map back to inputsplits HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath); HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); @@ -149,7 +149,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { } }); LOG.info("Returning a total splits of " + rtSplits.size()); - return rtSplits.toArray(new InputSplit[rtSplits.size()]); + return rtSplits.toArray(new InputSplit[0]); } @Override diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java index cb8606e187221..c4b79cb10c59d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java @@ -48,7 +48,7 @@ public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job, } public static boolean canSkipMerging(JobConf jobConf) { - return Boolean.valueOf(jobConf.get(REALTIME_SKIP_MERGE_PROP, DEFAULT_REALTIME_SKIP_MERGE)); + return Boolean.parseBoolean(jobConf.get(REALTIME_SKIP_MERGE_PROP, DEFAULT_REALTIME_SKIP_MERGE)); } /** diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 45e88376bfc4a..46bc1b366d34c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -64,7 +64,7 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept split.getDeltaLogPaths(), usesCustomPayload ? getWriterSchema() : getReaderSchema(), split.getMaxCommitTime(), getMaxCompactionMemoryInBytes(), Boolean - .valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH)); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index 96172bf1b493a..3091188b89196 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -79,7 +79,7 @@ public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf job, this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf), split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(), Boolean - .valueOf(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), + .parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)), false, jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> { // convert Hoodie log record to Hadoop AvroWritable and buffer GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get(); @@ -93,7 +93,6 @@ public RealtimeUnmergedRecordReader(HoodieRealtimeFileSplit split, JobConf job, /** * Setup log and parquet reading in parallel. Both write to central buffer. */ - @SuppressWarnings("unchecked") private List> getParallelProducers() { List> producers = new ArrayList<>(); producers.add(new FunctionBasedQueueProducer<>(buffer -> { @@ -105,7 +104,7 @@ private List> getParallelProducers() } @Override - public boolean next(NullWritable key, ArrayWritable value) throws IOException { + public boolean next(NullWritable key, ArrayWritable value) { if (!iterator.hasNext()) { return false; } @@ -125,7 +124,7 @@ public ArrayWritable createValue() { } @Override - public long getPos() throws IOException { + public long getPos() { // TODO: vb - No logical way to represent parallel stream pos in a single long. // Should we just return invalid (-1). Where is it used ? return 0; diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java index 405549b03e1d6..559a573d5cc4c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputFormatTestUtil.java @@ -33,12 +33,12 @@ import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.UUID; public class InputFormatTestUtil { @@ -59,13 +59,10 @@ public static File prepareTable(TemporaryFolder basePath, int numberOfFiles, Str public static void simulateUpdates(File directory, final String originalCommit, int numberOfFilesUpdated, String newCommit, boolean randomize) throws IOException { - List dataFiles = Arrays.asList(directory.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - String commitTs = FSUtils.getCommitTime(name); - return originalCommit.equals(commitTs); - } - })); + List dataFiles = Arrays.asList(Objects.requireNonNull(directory.listFiles((dir, name) -> { + String commitTs = FSUtils.getCommitTime(name); + return originalCommit.equals(commitTs); + }))); if (randomize) { Collections.shuffle(dataFiles); } @@ -183,16 +180,10 @@ private static Iterable generateAvroRecords(Schema sche public static void simulateParquetUpdates(File directory, Schema schema, String originalCommit, int totalNumberOfRecords, int numberOfRecordsToUpdate, String newCommit) throws IOException { - File fileToUpdate = directory.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith("parquet"); - } - })[0]; + File fileToUpdate = Objects.requireNonNull(directory.listFiles((dir, name) -> name.endsWith("parquet")))[0]; String fileId = FSUtils.getFileId(fileToUpdate.getName()); File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId)); - AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema); - try { + try (AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema)) { for (GenericRecord record : generateAvroRecords(schema, totalNumberOfRecords, originalCommit, fileId)) { if (numberOfRecordsToUpdate > 0) { // update this record @@ -203,8 +194,6 @@ public boolean accept(File dir, String name) { } parquetWriter.write(record); } - } finally { - parquetWriter.close(); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java index 307c18203be16..4dfaea9118c47 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/InputPathHandlerTest.java @@ -157,7 +157,7 @@ static List generatePartitions(DistributedFileSystem dfs, String basePath) @Test public void testInputPathHandler() throws IOException { inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray( - new Path[inputPaths.size()]), incrementalTables); + new Path[0]), incrementalTables); List actualPaths = inputPathHandler.getGroupedIncrementalPaths().values().stream() .flatMap(List::stream).collect(Collectors.toList()); assertTrue(actualComparesToExpected(actualPaths, incrementalPaths)); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index ed501e70030d2..0f15f4568ae23 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -122,7 +122,7 @@ private void createCommitFile(TemporaryFolder basePath, String commitNumber, Str throws IOException { List writeStats = HoodieTestUtils.generateFakeHoodieWriteStat(1); HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); - writeStats.stream().forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); + writeStats.forEach(stat -> commitMetadata.addWriteStat(partitionPath, stat)); File file = new File(basePath.getRoot().toString() + "/.hoodie/", commitNumber + ".commit"); file.createNewFile(); FileOutputStream fileOutputStream = new FileOutputStream(file); @@ -221,8 +221,8 @@ public void testGetIncrementalTableNames() throws IOException { String defaultmode = String.format(HoodieHiveUtil.HOODIE_CONSUME_MODE_PATTERN, "db3.first_trips"); conf.set(defaultmode, HoodieHiveUtil.DEFAULT_SCAN_MODE); List actualincrTables = HoodieHiveUtil.getIncrementalTableNames(Job.getInstance(conf)); - for (int i = 0; i < expectedincrTables.length; i++) { - assertTrue(actualincrTables.contains(expectedincrTables[i])); + for (String expectedincrTable : expectedincrTables) { + assertTrue(actualincrTables.contains(expectedincrTable)); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index aaadebeadcfef..89b71684b64a9 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -68,7 +68,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -167,9 +167,9 @@ public void testNonPartitionedReader() throws Exception { private void setHiveColumnNameProps(List fields, JobConf jobConf, boolean isPartitioned) { String names = fields.stream().map(Field::name).collect(Collectors.joining(",")); - String postions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); + String positions = fields.stream().map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, postions); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions); String hiveOrderedColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase(PARTITION_COLUMN)) .map(Field::name).collect(Collectors.joining(",")); @@ -286,7 +286,7 @@ public void testUnMergedReader() throws Exception { String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1, jobConf), - basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); + basePath.getRoot().getPath(), Collections.singletonList(logFilePath), newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( @@ -361,7 +361,7 @@ public void testReaderWithNestedAndComplexSchema() throws Exception { String logFilePath = writer.getLogFile().getPath().toString(); HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit( new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1, jobConf), - basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime); + basePath.getRoot().getPath(), Collections.singletonList(logFilePath), newCommitTime); // create a RecordReader to be used by HoodieRealtimeRecordReader RecordReader reader = new MapredParquetInputFormat().getRecordReader( diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 8684f3bd9289b..bc976b0ea450f 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -80,7 +80,7 @@ public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { } } - public void syncHoodieTable() throws ClassNotFoundException { + public void syncHoodieTable() { try { switch (hoodieHiveClient.getTableType()) { case COPY_ON_WRITE: @@ -193,7 +193,7 @@ private List filterPartitions(List events, PartitionEven .collect(Collectors.toList()); } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { // parse the params final HiveSyncConfig cfg = new HiveSyncConfig(); JCommander cmd = new JCommander(cfg, null, args); diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 4578bb2fa4b90..208173828609d 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -18,7 +18,6 @@ package org.apache.hudi.hive; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; @@ -41,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -65,7 +65,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -190,7 +189,7 @@ private String getPartitionClause(String partition) { for (int i = 0; i < syncConfig.partitionFields.size(); i++) { partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'"); } - return partBuilder.stream().collect(Collectors.joining(",")); + return String.join(",", partBuilder); } private List constructChangePartitions(String tableName, List partitions) { @@ -500,7 +499,7 @@ public void updateHiveSQL(String s) { * @param sql SQL statement to execute */ public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) throws HoodieHiveSyncException { - List responses = updateHiveSQLs(Arrays.asList(sql)); + List responses = updateHiveSQLs(Collections.singletonList(sql)); return responses.get(responses.size() - 1); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java index 39ac694842ddb..4a227c6c95273 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java @@ -87,7 +87,7 @@ static String[] getHiveConsoleCommand(String rawCommand) { cmd.add("hive.stats.autogather=false"); cmd.add("-e"); cmd.add("\"" + fullCommand + "\""); - return cmd.stream().toArray(String[]::new); + return cmd.toArray(new String[0]); } private static String getHiveConsoleCommandFile(String commandFile, String additionalVar) { diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index af8bb1697039c..f7aa67ea2e4a4 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -43,7 +43,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -127,7 +127,7 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco } public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { - checkPropNames.stream().forEach(prop -> { + checkPropNames.forEach(prop -> { if (!props.containsKey(prop)) { throw new HoodieNotSupportedException("Required property " + prop + " is missing"); } @@ -182,19 +182,13 @@ public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable order @SuppressWarnings("unchecked") public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRDD incomingHoodieRecords, HoodieWriteConfig writeConfig, Option timelineService) { - HoodieReadClient client = null; - try { - client = new HoodieReadClient<>(jssc, writeConfig, timelineService); + try (HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService)) { return client.tagLocation(incomingHoodieRecords) .filter(r -> !((HoodieRecord) r).isCurrentLocationKnown()); } catch (TableNotFoundException e) { // this will be executed when there is no hoodie table yet // so no dups to drop return incomingHoodieRecords; - } finally { - if (null != client) { - client.close(); - } } } @@ -207,12 +201,12 @@ public static JavaRDD dropDuplicates(JavaSparkContext jssc, JavaRD } public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) { - checkRequiredProperties(props, Arrays.asList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY())); + checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY())); HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); hiveSyncConfig.basePath = basePath; hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY(), - Boolean.valueOf(DataSourceWriteOptions.DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL())); + Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL())); hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), DataSourceWriteOptions.DEFAULT_HIVE_DATABASE_OPT_VAL()); hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 16c2d7548680c..decab9c640028 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -52,7 +52,7 @@ object AvroConversionUtils { } def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = { - df.rdd.map(row => (new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField)))) + df.rdd.map(row => new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField))) } def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = { @@ -67,7 +67,7 @@ object AvroConversionUtils { val convertor = AvroConversionHelper.createConverterToRow(schema, dataType) records.map { x => convertor(x).asInstanceOf[Row] } } - }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))).asInstanceOf[Dataset[Row]] + }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))) } } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 23a757db9fb9e..931f23bc2b86e 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -63,7 +63,7 @@ class DefaultSource extends RelationProvider sqlContext.sparkContext.hadoopConfiguration.setClass( "mapreduce.input.pathFilter.class", classOf[HoodieROTablePathFilter], - classOf[org.apache.hadoop.fs.PathFilter]); + classOf[org.apache.hadoop.fs.PathFilter]) log.info("Constructing hoodie (as parquet) data source with options :" + parameters) log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " + diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d4d48f960e317..598e5cd292bcb 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -77,7 +77,7 @@ private[hudi] object HoodieSparkSqlWriter { val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(parameters("path")) - val commitTime = HoodieActiveTimeline.createNewInstantTime(); + val commitTime = HoodieActiveTimeline.createNewInstantTime() val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) @@ -282,7 +282,7 @@ private[hudi] object HoodieSparkSqlWriter { client.close() commitSuccess && syncHiveSucess } else { - log.error(s"$operation failed with ${errorCount} errors :"); + log.error(s"$operation failed with $errorCount errors :") if (log.isTraceEnabled) { log.trace("Printing out the top 100 errors") writeStatuses.rdd.filter(ws => ws.hasErrors) diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index 31a04d691f9ea..5a307d3bad4c2 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -31,7 +31,7 @@ import org.scalatest.junit.AssertionsForJUnit class TestDataSourceDefaults extends AssertionsForJUnit { val schema = SchemaTestUtil.getComplexEvolvedSchema - var baseRecord: GenericRecord = null + var baseRecord: GenericRecord = _ @Before def initialize(): Unit = { baseRecord = SchemaTestUtil @@ -60,10 +60,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new SimpleKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // recordkey field not specified try { @@ -72,10 +71,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new SimpleKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // nested field as record key and partition path val hk2 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.isAdmin", "false")) @@ -89,14 +87,13 @@ class TestDataSourceDefaults extends AssertionsForJUnit { .getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: HoodieException => // do nothing - } - }; + } // if partition path can't be found, return default partition path val hk3 = new SimpleKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false")) - .getKey(baseRecord); + .getKey(baseRecord) assertEquals("default", hk3.getPartitionPath) // if enable hive style partitioning @@ -155,10 +152,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new ComplexKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // recordkey field not specified try { @@ -167,10 +163,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new ComplexKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // nested field as record key and partition path val hk2 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId,testNestedRecord.isAdmin", "testNestedRecord.userId,testNestedRecord.isAdmin", "false")) @@ -184,14 +179,13 @@ class TestDataSourceDefaults extends AssertionsForJUnit { .getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: HoodieException => // do nothing - } - }; + } // if partition path can't be found, return default partition path val hk3 = new ComplexKeyGenerator(getKeyConfig("testNestedRecord.userId", "testNestedRecord.notThere", "false")) - .getKey(baseRecord); + .getKey(baseRecord) assertEquals("default", hk3.getPartitionPath) // if enable hive style partitioning @@ -269,10 +263,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { new GlobalDeleteKeyGenerator(props).getKey(baseRecord) fail("Should have errored out") } catch { - case e: IllegalArgumentException => { + case e: IllegalArgumentException => // do nothing - } - }; + } // Nested record key not found try { @@ -280,10 +273,9 @@ class TestDataSourceDefaults extends AssertionsForJUnit { .getKey(baseRecord) fail("Should have errored out") } catch { - case e: HoodieException => { + case e: HoodieException => // do nothing - } - }; + } // if all parts of the composite record key are null/empty, throw error try { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index a25b129addb6f..a807e05f59ad0 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -350,8 +350,7 @@ public void handle(@NotNull Context context) throws Exception { } finally { long endTs = System.currentTimeMillis(); long timeTakenMillis = endTs - beginTs; - LOG - .info(String.format( + LOG.info(String.format( "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + "Success=%s, Query=%s, Host=%s, synced=%s", timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 856e6824728b1..2cfc914546d43 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -56,6 +56,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -81,7 +82,7 @@ public HDFSParquetImporter(Config cfg) { this.cfg = cfg; } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { @@ -160,8 +161,7 @@ protected JavaRDD> buildHoodieRecordsForImport AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new Schema.Parser().parse(schemaStr))); ParquetInputFormat.setReadSupportClass(job, (AvroReadSupport.class)); - return jsc - .newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, + return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, Void.class, GenericRecord.class, job.getConfiguration()) // To reduce large number of tasks. .coalesce(16 * cfg.parallelism).map(entry -> { @@ -198,7 +198,7 @@ protected JavaRDD> buildHoodieRecordsForImport * @param Type */ protected JavaRDD load(HoodieWriteClient client, String instantTime, - JavaRDD> hoodieRecords) throws Exception { + JavaRDD> hoodieRecords) { switch (cfg.command.toLowerCase()) { case "upsert": { return client.upsert(hoodieRecords, instantTime); @@ -227,7 +227,7 @@ public void validate(String name, String value) throws ParameterException { public static class FormatValidator implements IValueValidator { - List validFormats = Arrays.asList("parquet"); + List validFormats = Collections.singletonList("parquet"); @Override public void validate(String name, String value) throws ParameterException { @@ -241,7 +241,7 @@ public void validate(String name, String value) throws ParameterException { public static class Config implements Serializable { @Parameter(names = {"--command", "-c"}, description = "Write command Valid values are insert(default)/upsert/bulkinsert", - required = false, validateValueWith = CommandValidator.class) + validateValueWith = CommandValidator.class) public String command = "INSERT"; @Parameter(names = {"--src-path", "-sp"}, description = "Base path for the input table", required = true) public String srcPath = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java index b08517c4e7129..7d356a554dbcb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java @@ -62,7 +62,6 @@ public class HiveIncrementalPuller { private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class); - private static String driverName = "org.apache.hive.jdbc.HiveDriver"; public static class Config implements Serializable { @@ -97,6 +96,7 @@ public static class Config implements Serializable { } static { + String driverName = "org.apache.hive.jdbc.HiveDriver"; try { Class.forName(driverName); } catch (ClassNotFoundException e) { @@ -219,8 +219,7 @@ private void initHiveBeelineProperties(Statement stmt) throws SQLException { // Set the from commit time executeStatement("set hoodie." + config.sourceTable + ".consume.start.timestamp=" + config.fromCommitTime, stmt); // Set number of commits to pull - executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + String.valueOf(config.maxCommits), - stmt); + executeStatement("set hoodie." + config.sourceTable + ".consume.max.commits=" + config.maxCommits, stmt); } private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException { @@ -233,14 +232,14 @@ private void executeStatement(String sql, Statement stmt) throws SQLException { stmt.execute(sql); } - private String inferCommitTime(FileSystem fs) throws SQLException, IOException { + private String inferCommitTime(FileSystem fs) throws IOException { LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie table " + config.targetDb + "." + config.targetTable); String targetDataLocation = getTableLocation(config.targetDb, config.targetTable); return scanForCommitTime(fs, targetDataLocation); } - private String getTableLocation(String db, String table) throws SQLException { + private String getTableLocation(String db, String table) { ResultSet resultSet = null; Statement stmt = null; try { @@ -309,7 +308,7 @@ private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) throw return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); } - private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) throws IOException { + private String getLastCommitTimePulled(FileSystem fs, String sourceTableLocation) { HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), sourceTableLocation); List commitsToSync = metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() .findInstantsAfter(config.fromCommitTime, config.maxCommits).getInstants().map(HoodieInstant::getTimestamp) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index 13b712552444a..11f44e1438ac8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -31,7 +31,6 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -45,11 +44,6 @@ public class HoodieCleaner { */ private final Config cfg; - /** - * Filesystem used. - */ - private transient FileSystem fs; - /** * Spark context. */ @@ -60,22 +54,25 @@ public class HoodieCleaner { */ private TypedProperties props; - public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException { + public HoodieCleaner(Config cfg, JavaSparkContext jssc) { this.cfg = cfg; this.jssc = jssc; - this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); + /* + * Filesystem used. + */ + FileSystem fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration()); this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) : UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig(); LOG.info("Creating Cleaner with configs : " + props.toString()); } - public void run() throws Exception { + public void run() { HoodieWriteConfig hoodieCfg = getHoodieClientConfig(); HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, false); client.clean(); } - private HoodieWriteConfig getHoodieClientConfig() throws Exception { + private HoodieWriteConfig getHoodieClientConfig() { return HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.basePath).withAutoCommit(false) .withProps(props).build(); } @@ -101,7 +98,7 @@ public static class Config implements Serializable { public Boolean help = false; } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java index 87e6cecddf28f..3634362fbfe6b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java @@ -60,8 +60,7 @@ public static void main(String[] args) throws Exception { */ public void run(JavaSparkContext jsc) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath); - final CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath); - try { + try (CompactionAdminClient admin = new CompactionAdminClient(jsc, cfg.basePath)) { final FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration()); if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) { throw new IllegalStateException("Output File Path already exists"); @@ -101,8 +100,6 @@ public void run(JavaSparkContext jsc) throws Exception { default: throw new IllegalStateException("Not yet implemented !!"); } - } finally { - admin.close(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java index 1547403b43bdb..60cabe57d73b2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java @@ -83,7 +83,7 @@ public static class Config implements Serializable { public List configs = new ArrayList<>(); } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { final Config cfg = new Config(); JCommander cmd = new JCommander(cfg, null, args); if (cfg.help || args.length == 0) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java index f89aa78ea2d59..feb2c219a9200 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieWithTimelineServer.java @@ -42,30 +42,28 @@ public class HoodieWithTimelineServer implements Serializable { private final Config cfg; - private transient Javalin app = null; - public HoodieWithTimelineServer(Config cfg) { this.cfg = cfg; } public static class Config implements Serializable { - @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master") public String sparkMaster = null; @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = true) public String sparkMemory = null; - @Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions", required = false) + @Parameter(names = {"--num-partitions", "-n"}, description = "Num Partitions") public Integer numPartitions = 100; - @Parameter(names = {"--server-port", "-p"}, description = " Server Port", required = false) + @Parameter(names = {"--server-port", "-p"}, description = " Server Port") public Integer serverPort = 26754; - @Parameter(names = {"--delay-secs", "-d"}, description = "Delay(sec) before client connects", required = false) + @Parameter(names = {"--delay-secs", "-d"}, description = "Delay(sec) before client connects") public Integer delaySecs = 30; @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; } public void startService() { - app = Javalin.create().start(cfg.serverPort); + Javalin app = Javalin.create().start(cfg.serverPort); app.get("/", ctx -> ctx.result("Hello World")); } @@ -107,7 +105,7 @@ public String sendRequest(String driverHost, int port) throws RuntimeException { System.out.println("Response Code from(" + url + ") : " + response.getStatusLine().getStatusCode()); try (BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) { - StringBuffer result = new StringBuffer(); + StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index 4e0c86cf32430..3fe00ca0cbdcd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -167,9 +167,8 @@ private static SparkConf buildSparkConf(String appName, String defaultMaster, Ma sparkConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK"); - additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), e.getValue())); - SparkConf newSparkConf = HoodieWriteClient.registerClasses(sparkConf); - return newSparkConf; + additionalConfigs.forEach(sparkConf::set); + return HoodieWriteClient.registerClasses(sparkConf); } public static JavaSparkContext buildSparkContext(String appName, String defaultMaster, Map configs) { @@ -200,7 +199,7 @@ public static JavaSparkContext buildSparkContext(String appName, String sparkMas * @param parallelism Parallelism */ public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, String schemaStr, - int parallelism, Option compactionStrategyClass, TypedProperties properties) throws Exception { + int parallelism, Option compactionStrategyClass, TypedProperties properties) { HoodieCompactionConfig compactionConfig = compactionStrategyClass .map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false) .withCompactionStrategy(ReflectionUtils.loadClass(strategy)).build()) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java index eb3212fa03585..f67b62c795a19 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java @@ -50,7 +50,7 @@ public Compactor(HoodieWriteClient compactionClient, JavaSparkContext jssc) { public void compact(HoodieInstant instant) throws IOException { LOG.info("Compactor executing compaction " + instant); JavaRDD res = compactionClient.compact(instant.getTimestamp()); - long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count(); + long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count(); if (numWriteErrors != 0) { // We treat even a single error in compaction as fatal LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 2dd4138cf17a8..e608a6414d4be 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -376,8 +376,8 @@ private Option writeToSink(JavaRDD records, String checkpo throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation); } - long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue(); - long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue(); + long totalErrorRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue(); + long totalRecords = writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue(); boolean hasErrors = totalErrorRecords > 0; long hiveSyncTimeMs = 0; if (!hasErrors || cfg.commitOnErrors) { @@ -414,10 +414,10 @@ private Option writeToSink(JavaRDD records, String checkpo } else { LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords); LOG.error("Printing out the top 100 errors"); - writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> { + writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> { LOG.error("Global error :", ws.getGlobalError()); if (ws.getErrors().size() > 0) { - ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + r.getKey() + " is " + r.getValue())); + ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + key + " is " + value)); } }); // Rolling back instant @@ -456,7 +456,7 @@ private String startCommit() { /** * Sync to Hive. */ - private void syncHive() throws ClassNotFoundException { + private void syncHive() { if (cfg.enableHiveSync) { HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath); LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index b8ff404c8b572..9c05f315cbd1b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -64,7 +64,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import java.util.stream.IntStream; /** @@ -440,7 +439,7 @@ protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) { HoodieTableMetaClient meta = new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true); List pending = CompactionUtils.getPendingCompactionInstantTimes(meta); - pending.stream().forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); + pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant)); asyncCompactService.start((error) -> { // Shutdown DeltaSync shutdown(false); @@ -554,29 +553,27 @@ private HoodieInstant fetchNextCompactionInstant() throws InterruptedException { @Override protected Pair startService() { ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction); - List> compactionFutures = - IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { - try { - // Set Compactor Pool Name for allowing users to prioritize compaction - LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); - jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); - - while (!isShutdownRequested()) { - final HoodieInstant instant = fetchNextCompactionInstant(); - if (null != instant) { - compactor.compact(instant); - } - } - LOG.info("Compactor shutting down properly!!"); - } catch (InterruptedException ie) { - LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); - } catch (IOException e) { - LOG.error("Compactor executor failed", e); - throw new HoodieIOException(e.getMessage(), e); + return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> { + try { + // Set Compactor Pool Name for allowing users to prioritize compaction + LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME); + jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME); + + while (!isShutdownRequested()) { + final HoodieInstant instant = fetchNextCompactionInstant(); + if (null != instant) { + compactor.compact(instant); } - return true; - }, executor)).collect(Collectors.toList()); - return Pair.of(CompletableFuture.allOf(compactionFutures.stream().toArray(CompletableFuture[]::new)), executor); + } + LOG.info("Compactor shutting down properly!!"); + } catch (InterruptedException ie) { + LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie); + } catch (IOException e) { + LOG.error("Compactor executor failed", e); + throw new HoodieIOException(e.getMessage(), e); + } + return true; + }, executor)).toArray(CompletableFuture[]::new)), executor); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index 19f8e10ab5bc7..a054b277a8d8f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -25,8 +25,8 @@ public class HoodieDeltaStreamerMetrics { - private HoodieWriteConfig config = null; - private String tableName = null; + private HoodieWriteConfig config; + private String tableName; public String overallTimerName = null; public String hiveSyncTimerName = null; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java index c0059ad4572ea..e98b86763a919 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java @@ -46,7 +46,7 @@ public class SchedulerConfGenerator { public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode"; public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file"; - private static String SPARK_SCHEDULING_PATTERN = + private static final String SPARK_SCHEDULING_PATTERN = "\n\n \n" + " %s\n %s\n %s\n" + " \n \n %s\n" diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index a31683b638496..deb26b57d52fe 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -36,7 +36,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import java.util.Arrays; +import java.util.Collections; public class HoodieIncrSource extends RowSource { @@ -87,9 +87,9 @@ public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, Sp @Override public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { - DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH)); + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.HOODIE_SRC_BASE_PATH)); - /** + /* * DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH, * Config.HOODIE_SRC_PARTITION_FIELDS)); List partitionFields = * props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new ArrayList<>()); PartitionValueExtractor @@ -121,7 +121,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt Dataset source = reader.load(srcPath); - /** + /* * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema()); * * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java index 9dd2c6a653005..988dbd822102d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java @@ -25,7 +25,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import java.io.IOException; import java.io.Serializable; /** @@ -80,7 +79,7 @@ private void initJsonConvertor() { } } - public GenericRecord fromJson(String json) throws IOException { + public GenericRecord fromJson(String json) { initSchema(); initJsonConvertor(); return jsonConverter.convert(json, schema); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java index cb0c82244ef0b..b358bd6f53df6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestTimestampBasedKeyGenerator.java @@ -33,13 +33,12 @@ import static org.junit.Assert.assertEquals; public class TestTimestampBasedKeyGenerator { - private Schema schema; private GenericRecord baseRecord; private TypedProperties properties = new TypedProperties(); @Before public void initialize() throws IOException { - schema = SchemaTestUtil.getTimestampEvolvedSchema(); + Schema schema = SchemaTestUtil.getTimestampEvolvedSchema(); baseRecord = SchemaTestUtil .generateAvroRecordFromJson(schema, 1, "001", "f1");