diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index ef37dd18356b2..c200abee5e746 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -434,9 +434,9 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> e.getValue().stream()).collect(Collectors.toList()); - if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { + if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) { throw new HoodieClusteringException("Clustering failed to write to files:" - + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); + + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(","))); } final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index 601d2ec8a7f45..46d2466c5cf58 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -53,6 +53,7 @@ import org.apache.spark.broadcast.Broadcast; import java.io.IOException; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -77,8 +78,7 @@ public HoodieWriteMetadata> performClustering(final Hood JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier(); final SerializableSchema serializableSchema = new SerializableSchema(schema); - final List clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(clusteringGroup -> - ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList()); + final List clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList()); String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode"); Broadcast umaskBroadcastValue = engineContext.broadcast(umask); @@ -121,7 +121,7 @@ private Stream runClusteringForGroup(ClusteringGroupInfo clustering Iterable> writeStatusIterable = () -> writeStatuses; return StreamSupport.stream(writeStatusIterable.spliterator(), false) - .flatMap(writeStatusList -> writeStatusList.stream()); + .flatMap(Collection::stream); } @@ -152,7 +152,7 @@ private Iterator> readRecordsForGroupBaseFiles(List transform(record)).iterator(); + return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator(); }).collect(Collectors.toList()); return new ConcatenatingIterator<>(iteratorsForPartition); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java index acb6d82ae189c..6d819df3c2093 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java @@ -44,7 +44,7 @@ public SparkAllowUpdateStrategy( public Pair>, Set> handleUpdate(HoodieData> taggedRecordsRDD) { List fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD); Set fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream() - .filter(f -> fileGroupsInPendingClustering.contains(f)) + .filter(fileGroupsInPendingClustering::contains) .collect(Collectors.toSet()); return Pair.of(taggedRecordsRDD, fileGroupIdsWithUpdatesAndPendingClustering); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java index a6d03eae2b361..4c4200d9ba35d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -71,8 +72,7 @@ public static void runValidators(HoodieWriteConfig config, if (!writeMetadata.getWriteStats().isPresent()) { writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList()); } - Set partitionsModified = writeMetadata.getWriteStats().get().stream().map(writeStats -> - writeStats.getPartitionPath()).collect(Collectors.toSet()); + Set partitionsModified = writeMetadata.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet()); SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context)); // Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback) table.getMetaClient().reloadActiveTimeline(); @@ -80,11 +80,9 @@ public static void runValidators(HoodieWriteConfig config, Dataset afterState = getRecordsFromPendingCommits(sqlContext, partitionsModified, writeMetadata, table, instantTime).cache(); Stream validators = Arrays.stream(config.getPreCommitValidators().split(",")) - .map(validatorClass -> { - return ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, - new Class[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, - table, context, config)); - }); + .map(validatorClass -> ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, + new Class[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, + table, context, config))); boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join) .reduce(true, Boolean::logicalAnd); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java index e23723ac721b2..7b644938bb9f2 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketPartitioner.java @@ -266,9 +266,7 @@ private JavaRDD> doPartitionAndSortByRecordKey(JavaRDD comparator = (Comparator & Serializable) (t1, t2) -> { - return t1.getRecordKey().compareTo(t2.getRecordKey()); - }; + Comparator comparator = (Comparator & Serializable) (t1, t2) -> t1.getRecordKey().compareTo(t2.getRecordKey()); return records.mapToPair(record -> new Tuple2<>(record.getKey(), record)) .repartitionAndSortWithinPartitions(partitioner, comparator) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index f99bf876c93c8..16f47b8f8c0d8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -427,7 +427,7 @@ Map mapFileWithInsertsToUniquePartition(JavaRDD wr // Map each fileId that has inserts to a unique partition Id. This will be used while // repartitioning RDD final List fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0) - .map(w -> w.getFileId()).collect(); + .map(WriteStatus::getFileId).collect(); for (final String fileId : fileIds) { fileIdPartitionMap.put(fileId, partitionIndex++); } @@ -445,7 +445,7 @@ public HoodieData updateLocation( writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w)) .partitionBy(new WriteStatusPartitioner(fileIdPartitionMap, this.numWriteStatusWithInserts)) - .map(w -> w._2()); + .map(Tuple2::_2); JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context); acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc); JavaRDD writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(), diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java index 60c32b34da2a8..ca01def803667 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java @@ -52,12 +52,12 @@ public void clear() { @Override public void increment(String name) { - counters.merge(name, 1L, (oldValue, newValue) -> oldValue + newValue); + counters.merge(name, 1L, Long::sum); } @Override public void add(String name, long value) { - counters.merge(name, value, (oldValue, newValue) -> oldValue + newValue); + counters.merge(name, value, Long::sum); } @Override @@ -80,13 +80,13 @@ public Map getAllCounts(boolean prefixWithRegistryName) { @Override public void add(Map arg) { - arg.forEach((key, value) -> add(key, value)); + arg.forEach(this::add); } @Override public AccumulatorV2, Map> copy() { DistributedRegistry registry = new DistributedRegistry(name); - counters.forEach((key, value) -> registry.add(key, value)); + counters.forEach(registry::add); return registry; } @@ -97,7 +97,7 @@ public boolean isZero() { @Override public void merge(AccumulatorV2, Map> acc) { - acc.value().forEach((key, value) -> add(key, value)); + acc.value().forEach(this::add); } @Override