Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());

if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) {
if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
}

final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.spark.broadcast.Broadcast;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -77,8 +78,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final Hood
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
final TaskContextSupplier taskContextSupplier = getEngineContext().getTaskContextSupplier();
final SerializableSchema serializableSchema = new SerializableSchema(schema);
final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(clusteringGroup ->
ClusteringGroupInfo.create(clusteringGroup)).collect(Collectors.toList());
final List<ClusteringGroupInfo> clusteringGroupInfos = clusteringPlan.getInputGroups().stream().map(ClusteringGroupInfo::create).collect(Collectors.toList());

String umask = engineContext.hadoopConfiguration().get("fs.permissions.umask-mode");
Broadcast<String> umaskBroadcastValue = engineContext.broadcast(umask);
Expand Down Expand Up @@ -121,7 +121,7 @@ private Stream<WriteStatus> runClusteringForGroup(ClusteringGroupInfo clustering

Iterable<List<WriteStatus>> writeStatusIterable = () -> writeStatuses;
return StreamSupport.stream(writeStatusIterable.spliterator(), false)
.flatMap(writeStatusList -> writeStatusList.stream());
.flatMap(Collection::stream);
}


Expand Down Expand Up @@ -152,7 +152,7 @@ private Iterator<HoodieRecord<T>> readRecordsForGroupBaseFiles(List<ClusteringOp
}
};

return StreamSupport.stream(indexedRecords.spliterator(), false).map(record -> transform(record)).iterator();
return StreamSupport.stream(indexedRecords.spliterator(), false).map(this::transform).iterator();
}).collect(Collectors.toList());

return new ConcatenatingIterator<>(iteratorsForPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public SparkAllowUpdateStrategy(
public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream()
.filter(f -> fileGroupsInPendingClustering.contains(f))
.filter(fileGroupsInPendingClustering::contains)
.collect(Collectors.toSet());
return Pair.of(taggedRecordsRDD, fileGroupIdsWithUpdatesAndPendingClustering);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -71,20 +72,17 @@ public static void runValidators(HoodieWriteConfig config,
if (!writeMetadata.getWriteStats().isPresent()) {
writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
}
Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(writeStats ->
writeStats.getPartitionPath()).collect(Collectors.toSet());
Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context));
// Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback)
table.getMetaClient().reloadActiveTimeline();
Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, partitionsModified, table).cache();
Dataset<Row> afterState = getRecordsFromPendingCommits(sqlContext, partitionsModified, writeMetadata, table, instantTime).cache();

Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(","))
.map(validatorClass -> {
return ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass,
new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class},
table, context, config));
});
.map(validatorClass -> ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass,
new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class},
table, context, config)));

boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join)
.reduce(true, Boolean::logicalAnd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,7 @@ private JavaRDD<HoodieRecord<T>> doPartitionAndSortByRecordKey(JavaRDD<HoodieRec
LOG.warn("Consistent bucket does not support global sort mode, the sort will only be done within each data partition");
}

Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) (t1, t2) -> {
return t1.getRecordKey().compareTo(t2.getRecordKey());
};
Comparator<HoodieKey> comparator = (Comparator<HoodieKey> & Serializable) (t1, t2) -> t1.getRecordKey().compareTo(t2.getRecordKey());

return records.mapToPair(record -> new Tuple2<>(record.getKey(), record))
.repartitionAndSortWithinPartitions(partitioner, comparator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ Map<String, Integer> mapFileWithInsertsToUniquePartition(JavaRDD<WriteStatus> wr
// Map each fileId that has inserts to a unique partition Id. This will be used while
// repartitioning RDD<WriteStatus>
final List<String> fileIds = writeStatusRDD.filter(w -> w.getStat().getNumInserts() > 0)
.map(w -> w.getFileId()).collect();
.map(WriteStatus::getFileId).collect();
for (final String fileId : fileIds) {
fileIdPartitionMap.put(fileId, partitionIndex++);
}
Expand All @@ -445,7 +445,7 @@ public HoodieData<WriteStatus> updateLocation(
writeStatusRDD.mapToPair(w -> new Tuple2<>(w.getFileId(), w))
.partitionBy(new WriteStatusPartitioner(fileIdPartitionMap,
this.numWriteStatusWithInserts))
.map(w -> w._2());
.map(Tuple2::_2);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
acquireQPSResourcesAndSetBatchSize(desiredQPSFraction, jsc);
JavaRDD<WriteStatus> writeStatusJavaRDD = partitionedRDD.mapPartitionsWithIndex(updateLocationFunction(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public void clear() {

@Override
public void increment(String name) {
counters.merge(name, 1L, (oldValue, newValue) -> oldValue + newValue);
counters.merge(name, 1L, Long::sum);
}

@Override
public void add(String name, long value) {
counters.merge(name, value, (oldValue, newValue) -> oldValue + newValue);
counters.merge(name, value, Long::sum);
}

@Override
Expand All @@ -80,13 +80,13 @@ public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {

@Override
public void add(Map<String, Long> arg) {
arg.forEach((key, value) -> add(key, value));
arg.forEach(this::add);
}

@Override
public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
DistributedRegistry registry = new DistributedRegistry(name);
counters.forEach((key, value) -> registry.add(key, value));
counters.forEach(registry::add);
return registry;
}

Expand All @@ -97,7 +97,7 @@ public boolean isZero() {

@Override
public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
acc.value().forEach((key, value) -> add(key, value));
acc.value().forEach(this::add);
}

@Override
Expand Down